472 lines
17 KiB
Python
472 lines
17 KiB
Python
"""STM32 MCU serial firmware loader"""
|
|
|
|
import time
|
|
import argparse
|
|
import serial
|
|
|
|
|
|
VERSION_STR = "stm32bl v0.0.0"
|
|
|
|
DESCRIPTION_STR = VERSION_STR + """
|
|
(c) 2016 by pavel.revak@gmail.com
|
|
https://github.com/pavelrevak/stm32bl
|
|
"""
|
|
|
|
|
|
class Stm32BLException(Exception):
|
|
"""General STM32 loader exception"""
|
|
|
|
class SerialException(Stm32BLException):
|
|
"""Serial communication Exception"""
|
|
|
|
class ConnectingException(Stm32BLException):
|
|
"""Connecting to boot-loader exception"""
|
|
|
|
class NoAnswerException(Stm32BLException):
|
|
"""No answer exception"""
|
|
|
|
class CommandNotAllowedException(Stm32BLException):
|
|
"""Command not allowed exception"""
|
|
|
|
class UnexpectedAnswerException(Stm32BLException):
|
|
"""Unexpected answer exception"""
|
|
|
|
class NoAckException(Stm32BLException):
|
|
"""General No ACK exception"""
|
|
|
|
class NoAckCommandException(NoAckException):
|
|
"""No ACK after command exception"""
|
|
|
|
class NoAckDataException(NoAckException):
|
|
"""No ACK after data exception"""
|
|
|
|
|
|
class Stm32bl():
|
|
"""STM32 firmware loader class"""
|
|
|
|
CMD_INIT = 0x7f
|
|
CMD_ACK = 0x79
|
|
CMD_NOACK = 0x1f
|
|
CMD_GET = 0x00
|
|
CMD_GET_VERSION = 0x01
|
|
CMD_GET_ID = 0x02
|
|
CMD_READ_MEMORY = 0x11
|
|
CMD_GO = 0x21
|
|
CMD_WRITE_MEMORY = 0x31
|
|
CMD_ERASE = 0x43
|
|
CMD_EXTENDED_ERASE = 0x44
|
|
CMD_WRITE_PROTECT = 0x63
|
|
CMD_WRITE_UNPROTECT = 0x73
|
|
CMD_READOUT_PROTECT = 0x82
|
|
CMD_READOUT_UNPROTECT = 0x92
|
|
|
|
FLASH_START = 0x08000000
|
|
|
|
def __init__(self, port, baudrate=19200, verbosity=1):
|
|
try:
|
|
self._serial_port = serial.Serial(
|
|
port=port,
|
|
baudrate=baudrate,
|
|
parity=serial.PARITY_EVEN,
|
|
stopbits=1,
|
|
timeout=1
|
|
)
|
|
except (FileNotFoundError, serial.serialutil.SerialException):
|
|
raise SerialException("Error opening serial port: %s" % port)
|
|
self._verbosity = verbosity
|
|
self._connect(5)
|
|
self._allowed_commands = [self.CMD_GET, ]
|
|
self._boot_version = self._cmd_get()
|
|
self._option_bytes = self._cmd_get_version()
|
|
self._dev_id = self._cmd_get_id()
|
|
|
|
@staticmethod
|
|
def print_buffer(addr, data, bytes_per_line=16):
|
|
"""print buffer"""
|
|
prev_chunk = []
|
|
same_chunk = False
|
|
for i in range(0, len(data), bytes_per_line):
|
|
chunk = data[i:i + bytes_per_line]
|
|
if prev_chunk != chunk:
|
|
print('%08x %s%s %s' % (
|
|
addr,
|
|
' '.join(['%02x' % d for d in chunk]),
|
|
' ' * (16 - len(chunk)),
|
|
''.join([chr(d) if d >= 32 and d < 127 else '.' for d in chunk]),
|
|
))
|
|
prev_chunk = chunk
|
|
same_chunk = False
|
|
elif not same_chunk:
|
|
print('*')
|
|
same_chunk = True
|
|
addr += len(chunk)
|
|
print('%08x' % addr)
|
|
|
|
def log(self, message, operation=None, level=1):
|
|
"""logging printing"""
|
|
if self._verbosity < level:
|
|
return
|
|
msg = ''
|
|
if level > 0:
|
|
msg += ':' * level
|
|
if operation:
|
|
msg += '%s: ' % operation
|
|
msg += message
|
|
print(msg)
|
|
|
|
def _write(self, data):
|
|
"""Write data to serial port"""
|
|
self.log(":".join(['%02x' % d for d in data]), 'WR', level=3)
|
|
self._serial_port.write(bytes(data))
|
|
|
|
def _read(self, cnt=1, timeout=1):
|
|
"""Read data from serial port"""
|
|
data = []
|
|
while not data and timeout > 0:
|
|
data = list(self._serial_port.read(cnt))
|
|
timeout -= 1
|
|
self.log(":".join(['%02x' % d for d in data]), 'RD', level=3)
|
|
return data
|
|
|
|
def _reset_mcu(self):
|
|
"""Reset MCU"""
|
|
self._serial_port.setDTR(0)
|
|
time.sleep(0.1)
|
|
self._serial_port.setDTR(1)
|
|
time.sleep(0.2)
|
|
|
|
def _connect(self, repeat=1):
|
|
"""connect to boot-loader"""
|
|
self.log("Connecting to boot-loader", level=1)
|
|
self._serial_port.setRTS(0)
|
|
self._reset_mcu()
|
|
while repeat:
|
|
self._write([self.CMD_INIT])
|
|
ret = self._read()
|
|
if ret and ret[0] in (self.CMD_ACK, self.CMD_NOACK):
|
|
return
|
|
repeat -= 1
|
|
raise ConnectingException("Can't connect to MCU boot-loader.")
|
|
|
|
def exit_bootloader(self):
|
|
"""Exit boot-loader and restart MCU"""
|
|
self._serial_port.setRTS(1)
|
|
self._reset_mcu()
|
|
|
|
def _talk(self, data_wr, cnt_rd, timeout=1):
|
|
"""talk with boot-loader"""
|
|
if isinstance(data_wr, (tuple, list)):
|
|
xor = data_wr[0]
|
|
for i in data_wr[1:]:
|
|
xor ^= i
|
|
data_wr.append(xor)
|
|
else:
|
|
data_wr = [data_wr, data_wr ^ 0xff]
|
|
self._write(data_wr)
|
|
res = self._read(cnt_rd, timeout=timeout)
|
|
if not res:
|
|
raise NoAnswerException("No answer.")
|
|
return res
|
|
|
|
def _send_command(self, cmd, cnt_rd=None):
|
|
"""send command to boot-loader"""
|
|
if cmd not in self._allowed_commands:
|
|
raise CommandNotAllowedException("command %02x: is not supported by this device." % cmd)
|
|
if cnt_rd is None:
|
|
cnt_rd = 1
|
|
else:
|
|
cnt_rd += 2
|
|
res = self._talk(cmd, cnt_rd)
|
|
if res[0] != self.CMD_ACK or res[-1] != self.CMD_ACK:
|
|
raise NoAckCommandException("NoACK for command.")
|
|
return res[1:-1]
|
|
|
|
def _send_data(self, data, cnt_rd=None, timeout=1):
|
|
"""send command to boot-loader"""
|
|
res = self._talk(data, 1, timeout=timeout)
|
|
if res[0] != self.CMD_ACK:
|
|
raise NoAckDataException("NoACK for data.")
|
|
if cnt_rd is not None:
|
|
return self._read(cnt_rd, timeout=timeout)
|
|
|
|
@staticmethod
|
|
def _convert_version(ver):
|
|
return 'v%d.%d' % (ver // 16, ver % 16)
|
|
|
|
@staticmethod
|
|
def _convert_32bit(val):
|
|
return [
|
|
val >> 24,
|
|
0xff & (val >> 16),
|
|
0xff & (val >> 8),
|
|
0xff & val,
|
|
]
|
|
|
|
@staticmethod
|
|
def _convert_16bit(val):
|
|
return [
|
|
val >> 8,
|
|
0xff & val,
|
|
]
|
|
|
|
def _cmd_get(self):
|
|
"""Gets the version and the allowed commands supported
|
|
by the current version of the boot-loader"""
|
|
self.log("CMD_GET", level=2)
|
|
res = self._send_command(self.CMD_GET, 13)
|
|
if len(res) - 2 != res[0]:
|
|
raise UnexpectedAnswerException("CMD_GET command: wrong result length.")
|
|
boot_version = self._convert_version(res[1])
|
|
self.log(boot_version, 'BOOT_VERSION', level=1)
|
|
# update list of allowed commands
|
|
self._allowed_commands = res[2:]
|
|
return boot_version
|
|
|
|
def _cmd_get_version(self):
|
|
"""Gets the boot-loader version and the Read Protection
|
|
status of the Flash memory"""
|
|
self.log("CMD_GET_VERSION", level=2)
|
|
res = self._send_command(self.CMD_GET_VERSION, 3)
|
|
if len(res) != 3:
|
|
raise UnexpectedAnswerException("CMD_GET_VERSION: wrong length of result")
|
|
boot_version = self._convert_version(res[0])
|
|
if boot_version != self._boot_version:
|
|
raise UnexpectedAnswerException("Version between GET and GET_VERSION are different.")
|
|
option_bytes = res[1:]
|
|
self.log(":".join(['%02x' % i for i in option_bytes]), 'OPTION_BYTES', level=1)
|
|
return option_bytes
|
|
|
|
def _cmd_get_id(self):
|
|
"""Gets the chip ID"""
|
|
self.log("CMD_GET_ID", level=2)
|
|
res = self._send_command(self.CMD_GET_ID, 3)
|
|
if len(res) - 2 != res[0]:
|
|
raise UnexpectedAnswerException("CMD_GET_ID: wrong result length.")
|
|
dev_id = (res[1] << 8) + res[2]
|
|
self.log("%04x" % dev_id, 'DEV_ID', level=1)
|
|
return dev_id
|
|
|
|
def _cmd_read_memory(self, address, length):
|
|
"""Reads up to 256 bytes of memory starting from an
|
|
address specified by the application"""
|
|
self.log("CMD_READ_MEMORY(%08x, %d)" % (address, length), level=2)
|
|
self._send_command(self.CMD_READ_MEMORY)
|
|
self._send_data(self._convert_32bit(address))
|
|
return self._send_data(length - 1, length)
|
|
|
|
def cmd_go(self, address):
|
|
"""Jumps to user application code located in the internal
|
|
Flash memory or in SRAM"""
|
|
self.log("CMD_GO", level=2)
|
|
self._send_command(self.CMD_GO)
|
|
self._send_data(self._convert_32bit(address))
|
|
|
|
def _cmd_write_memory(self, address, data):
|
|
"""Writes up to 256 bytes to the RAM or Flash memory
|
|
starting from an address specified by the application"""
|
|
self.log("CMD_WRITE_MEMORY(%08x, %d)" % (address, len(data)), level=2)
|
|
self._send_command(self.CMD_WRITE_MEMORY)
|
|
self._send_data(self._convert_32bit(address))
|
|
return self._send_data([len(data) - 1] + data)
|
|
|
|
def _cmd_erase(self, pages=0xff):
|
|
"""Erases from one to all the Flash memory pages"""
|
|
self.log("CMD_ERASE(%d)" % pages, level=2)
|
|
self._send_command(self.CMD_ERASE)
|
|
if isinstance(pages, (list, tuple)):
|
|
data = [len(pages) - 1]
|
|
for page in pages:
|
|
data.append(page)
|
|
else:
|
|
data = pages
|
|
self._send_data(data, timeout=20)
|
|
|
|
def _cmd_extended_erase(self, pages=0xffff):
|
|
"""Erases from one to all the Flash memory pages using
|
|
two byte addressing mode (available only for v3.0 usart
|
|
bootloader versions and above)"""
|
|
self.log("CMD_EXTENDED_ERASE", level=2)
|
|
self._send_command(self.CMD_EXTENDED_ERASE)
|
|
if isinstance(pages, (list, tuple)):
|
|
data = self._convert_16bit(len(pages) - 1)
|
|
for page in pages:
|
|
data += self._convert_16bit(page)
|
|
else:
|
|
data = self._convert_16bit(0xffff)
|
|
self._send_data(data, timeout=20)
|
|
|
|
def cmd_write_protect(self, sectors):
|
|
"""Enables the write protection for some sectors"""
|
|
self.log("CMD_WRITE_PROTECT", level=2)
|
|
data = [len(sectors) - 1]
|
|
for sector in sectors:
|
|
data.append(sector)
|
|
self._send_data(data, timeout=20)
|
|
self._connect(5)
|
|
|
|
def cmd_write_unprotect(self):
|
|
"""Disables the write protection for all Flash memory sectors"""
|
|
self.log("CMD_WRITE_UNPROTECT", level=2)
|
|
self._send_command(self.CMD_WRITE_UNPROTECT, 0)
|
|
self._connect(5)
|
|
|
|
def cmd_readout_protect(self):
|
|
"""Enables the read protection"""
|
|
self.log("CMD_READOUT_PROTECT", level=2)
|
|
self._send_command(self.CMD_READOUT_PROTECT, 0)
|
|
self.log("Set readout protection, device is restarted", level=1)
|
|
self._connect(5)
|
|
|
|
def cmd_readout_unprotect(self):
|
|
"""Disables the read protection"""
|
|
self.log("CMD_READOUT_UNPROTECT", level=2)
|
|
self._send_command(self.CMD_READOUT_UNPROTECT, 0)
|
|
self.log("Removed readout protection, device is restarted", level=1)
|
|
self._connect(5)
|
|
|
|
def read_memory(self, address, size=None):
|
|
"""read memory"""
|
|
mem = []
|
|
if size is None:
|
|
self.log("address=0x%08x" % address, 'READ_MEMORY', level=1)
|
|
while True:
|
|
try:
|
|
mem += self._cmd_read_memory(address, 256)
|
|
except NoAckDataException:
|
|
self._read()
|
|
break
|
|
address += 256
|
|
self.log("done (%d Bytes)" % len(mem), 'READ_MEMORY', level=1)
|
|
else:
|
|
self.log("from 0x%08x (%d Bytes)" % (address, size), 'READ_MEMORY', level=1)
|
|
while size > 0:
|
|
_rd_size = size
|
|
if size > 256:
|
|
_rd_size = 256
|
|
size -= _rd_size
|
|
mem += self._cmd_read_memory(address, _rd_size)
|
|
address += _rd_size
|
|
self.log("done", 'READ_MEMORY', level=1)
|
|
return mem
|
|
|
|
def write_memory(self, address, data):
|
|
"""write memory"""
|
|
self.log("from 0x%08x (%d Bytes)" % (address, len(data)), 'WRITE_MEMORY', level=1)
|
|
_data = data[:]
|
|
while _data:
|
|
self._cmd_write_memory(address, _data[:256])
|
|
address += 256
|
|
_data = _data[256:]
|
|
self.log("done", 'WRITE_MEMORY', level=1)
|
|
|
|
def write_file(self, address, file_name, verify=False):
|
|
"""Write file and or verify"""
|
|
binfile = open(file_name, 'rb')
|
|
mem = list(binfile.read())
|
|
size = len(mem)
|
|
if size % 4:
|
|
mem += [0] * (size % 4)
|
|
size = len(mem)
|
|
self.write_memory(address, mem)
|
|
if not verify:
|
|
return
|
|
addr = address
|
|
mem_verify = self.read_memory(address, size)
|
|
_errors = 0
|
|
for data_a, data_b in zip(mem, mem_verify):
|
|
if data_a != data_b:
|
|
if _errors < 10:
|
|
self.log("0x%08x: 0x%02x != 0x%02x" % (addr, data_a, data_b), 'VERIFY', level=0)
|
|
_errors += 1
|
|
addr += 1
|
|
if _errors >= 10:
|
|
self.log(".. %d errors" % _errors, 'VERIFY', level=0)
|
|
else:
|
|
self.log("OK", 'VERIFY', level=1)
|
|
|
|
def mass_erase(self):
|
|
"""Mass erase"""
|
|
self.log("MASS_ERASE", level=1)
|
|
if self.CMD_ERASE in self._allowed_commands:
|
|
self._cmd_erase()
|
|
return
|
|
try:
|
|
self._cmd_extended_erase()
|
|
except NoAckException:
|
|
# some chips don't support mass erase
|
|
# protect and unprotect also make chip erase
|
|
try:
|
|
self.cmd_readout_protect()
|
|
except NoAckException:
|
|
# chip is already protected
|
|
pass
|
|
self.cmd_readout_unprotect()
|
|
|
|
def erase_blocks(self, blocks):
|
|
"""Mass erase"""
|
|
blocks = sorted(set(blocks))
|
|
self.log(",".join([str(b) for b in blocks]), 'ERASE_BLOCKS', level=1)
|
|
if self.CMD_ERASE in self._allowed_commands:
|
|
self._cmd_erase(blocks)
|
|
return
|
|
self._cmd_extended_erase(blocks)
|
|
|
|
|
|
def main():
|
|
"""Main application"""
|
|
parser = argparse.ArgumentParser(description=DESCRIPTION_STR)
|
|
parser.add_argument('-V', '--version', action='version', version=VERSION_STR)
|
|
parser.add_argument('-v', '--verbose', action='count', help="increase verbosity *", default=0)
|
|
parser.add_argument('-p', '--port', help="Serial port eg: /dev/ttyS0 or COM1", required=True)
|
|
parser.add_argument('-b', '--baud', help="Baud-rate (9600 - 115200)", default=115200)
|
|
parser.add_argument('-a', '--address', help="Set address for reading or writing")
|
|
parser.add_argument('-s', '--size', help="Set size for reading")
|
|
parser.add_argument('-r', '--read', help="Read content of memory to file")
|
|
parser.add_argument('-d', '--dump', action='store_true', help="Dump content of memory")
|
|
parser.add_argument('-m', '--mass-erase', action='store_true', help="Mass erase before writing")
|
|
parser.add_argument('-e', '--erase-block', type=int, action='append', help="Erase block *")
|
|
parser.add_argument('-w', '--write', action='append', help="Write file to memory *")
|
|
parser.add_argument('-f', '--verify', action='store_true', help="Verify after writing")
|
|
parser.add_argument('-x', '--execute', action='store_true', help="Start application")
|
|
parser.add_argument('-t', '--reset', action='store_true', help="Reset MCU and exit boot-loader")
|
|
parser.add_argument('-W', '--write-protect', type=int, action='append', help="WP sector *")
|
|
parser.add_argument('-U', '--write-unprotect', action='store_true', help="Write unprotect all")
|
|
parser.add_argument('-R', '--read-protect', action='store_true', help="Read Protect")
|
|
parser.add_argument('-T', '--read-unprotect', action='store_true', help="Read unprotect")
|
|
args = parser.parse_args()
|
|
address = int(args.address, 0) if args.address is not None else Stm32bl.FLASH_START
|
|
size = int(args.size, 0) if args.size is not None else None
|
|
|
|
try:
|
|
stm32bl = Stm32bl(port=args.port, baudrate=args.baud, verbosity=args.verbose)
|
|
if args.read_unprotect:
|
|
stm32bl.cmd_readout_unprotect()
|
|
if args.write_unprotect:
|
|
stm32bl.cmd_write_unprotect()
|
|
if args.dump or args.read:
|
|
mem = stm32bl.read_memory(address, size)
|
|
if args.dump:
|
|
stm32bl.print_buffer(address, mem)
|
|
if args.read:
|
|
binfile = open(args.read, 'wb')
|
|
binfile.write(bytes(mem))
|
|
if args.mass_erase:
|
|
stm32bl.mass_erase()
|
|
elif args.erase_block:
|
|
stm32bl.erase_blocks(args.erase_block)
|
|
if args.write:
|
|
stm32bl.write_file(address, args.write[0], args.verify)
|
|
if args.write_protect:
|
|
stm32bl.cmd_write_protect(args.write_protect)
|
|
if args.read_protect:
|
|
stm32bl.cmd_readout_protect()
|
|
if args.execute:
|
|
stm32bl.cmd_go(address)
|
|
if args.reset:
|
|
stm32bl.exit_bootloader()
|
|
except Stm32BLException as err:
|
|
raise SerialException("ERROR: %s" % err)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|