Reversing a remote with python and an oscilloscope (Part II)

10/09/2016 on ✒ Blog | tags : hardware hacking, electronics, reverse-engineering

0x3: from signal to actual bits - python code

To process the values inside the csv files which we previously downloaded from the scope we'll write a python program.

The program will flow as follows:

if __name__ == "__main__":
    printBanner()
    p = argparse.ArgumentParser(description='Extracts values from Rigol CSV.')

    p.add_argument('file', metavar='file', type=str, help='File to process.')
    p.add_argument('-o', '--outfile', nargs='?', type=str, help='Binary output to file.')
    p.add_argument('-v', '--verbosity', action='store_true', help="Print extra information to stdout.")
    p.add_argument('-b', '--batch', type=str, help="Process all csv files in target dir.")
    p.add_argument('-r', '--reverse', action='store_true',
                   help="Assign '1' symbol to short pulses (Defaults to short == '0').")
    p.add_argument('-e', '--endian', action='store_true',
                   help="Toggle endianness.")
    # ToDo: Recursive mode.
    args = p.parse_args()

    print("[+] Opening wave file %s" % args.file)
    samples = read_rigol_csv(args.file)
    packet = parseWave(samples, args.verbosity, args.reverse, args.endian)

    if args.outfile:
        outputToFile(args.outfile, packet)

    if args.verbosity:
        printToStdout(packet)

    print("[+] Done.\n")

After parsing the arguments passed to our program, we'll decide what to do with them.

The first function to call will be read_rigol_csv(); this function will dump the csv files into a list object:

def read_rigol_csv(fname, channel=1):
    raw_samples = []

    with open(fname, 'rt') as csvfile:
        c = csv.reader(csvfile)

        for row_num, row in enumerate(c):
            if row_num > 2:
                raw_samples.append(float(row[1]))

    return raw_samples

The returned object will be passed as argument to parseWave(), which will decode the voltage values into logic ones and zeroes:

def parseWave(samples, verbosity, reverse, endian):
    decodedBinaryString = ""
    preambleStart = 0
    preambleEnd = 0
    preambleSize = 0
    longPulseDuration = 0
    shortPulseDuration = 0
    packet = []

    # Detect preamble start
    # (!) vHIGH may need to be tuned to achieve proper decoding
    for sample in samples:
        if sample < vHIGH:
            if verbosity:
                print("[+] Preamble start detected! Sample: %d" % preambleStart)
            break
        else:
            preambleStart += 1

    # Detect preamble end
    for sample in samples[preambleStart:]:
        if sample > vLOW:
            preambleEnd = preambleStart + preambleSize
            shortPulseDuration = preambleSize / 11
            longPulseDuration = preambleSize / 3

            if verbosity:
                print("[+] Preamble end detected! Sample: %d" % preambleEnd)
                print(" |------- Preamble duration: %d samples" % preambleSize)
                print(" |---- Short pulse duration: %d samples" % (shortPulseDuration))
                print(" |----  Long pulse duration: %d samples" % (longPulseDuration))

            # Sanity check
            if (shortPulseDuration == 0) or (longPulseDuration == 0) or (preambleSize == 0):
                print("[e] Zero value detected, bad threshold values? Quitting...")
                exit(-1)
            else:
                print("[+] Decoding...")
                break
        else:
            preambleSize += 1

    # Start parsing
    # numSamplesAct will hold the number or read samples prior to a high-to-low transition
    numSamplesAct = 0

    for sample in samples[preambleEnd:]:
        if sample > vHIGH:
            numSamplesAct += 1
        else:
            decoded = decode(numSamplesAct, longPulseDuration, reverse)

            if decoded is not None:
                decodedBinaryString += decoded

            numSamplesAct = 0

    # Rip off START bit
    decodedBinaryString = decodedBinaryString[1:]

    i = 0
    currentByte = ""

    print("[+] Decoded %s bytes" % (len(decodedBinaryString) / 8.0))

    for bit in decodedBinaryString:
        if i < 7:
            currentByte += bit
            i += 1
        else:
            currentByte += bit
            if endian:
                currentByte = currentByte[::-1]
            packet.append(struct.pack("B", int(currentByte, 2)))

            currentByte = ""
            i = 0
    return packet

So this function does the juicy stuff:

First we have to detect the preamble; by observing the captures taken on the previous post, we can see that the voltage is normally high (~3v) and the preamble is just a bunch of low voltage samples; what we are going to do is to detect both start and end samples just by looping from the start and comparing the current sample with certain values that I have defined as global:

THRESHOLD_VOLTAGE = 1.5
THRESHOLD_SAMPLES = 10
vHIGH = 2
vLOW = 1

So, once we have detected both preamble start and end, we can use that information to infer the length of our pulses, as we learned with the oscilloscope on the previous post:

for sample in samples[preambleStart:]:
    if sample > vLOW:
        preambleEnd = preambleStart + preambleSize
        shortPulseDuration = preambleSize / 11
        longPulseDuration = shortPulseDuration * 3

With those values we loop through the rest of the samples accumulating the number of vHigh samples (i.e. how much samples does it take to get to a high-to-low change).

for sample in samples[preambleEnd:]:
    if sample > vHIGH:
        numSamplesAct += 1
    else:
        decoded = decode(numSamplesAct, longPulseDuration, reverse)

        if decoded is not None:
            decodedBinaryString += decoded

        numSamplesAct = 0

Then we call the decode() function which will take the accumulated samples as an argument, as well as how much samples a long pulse lasts and the reverse flag to indicate if a long pulse is interpreted as a logic one or as a logic zero:

def decode(num_samples, longPulseDuration, reverse):
    if num_samples == 0:
        return

    # Using XOR to reverse symbol assignation
    if (num_samples < (longPulseDuration - longPulseDuration / 2.5)) ^ reverse:
        return '0'
    else:
        return '1'

The previous 'if' statement decides whether to assign a one or a zero, xored with the 'reverse' flag (which effectively reverts the comparison result when set to '1' but does nothing when set to '0'). The (long-long/2.5) calculation is more or less accurate by taking noise variability into account; the number of samples will almost never be exactly the expected because the transmission medium nor the transmitter or receiver device are perfect.

Once we have decided to which logic level our samples get translated, we just group our bits in 8 element groups, pack them in a list and off we go!

for bit in decodedBinaryString:
    if i < 7:
        currentByte += bit
        i += 1
    else:
        currentByte += bit
        if endian:
            currentByte = currentByte[::-1]
        packet.append(struct.pack("B", int(currentByte, 2)))

        currentByte = ""
        i = 0

Now we can add a little eye candy to nicely print what we've decoded or save it to a file:

def printToStdout(packet):
    # Print human friendly binary string
    for byte in packet:

        # Unpacked byte
        unpacked = struct.unpack("B", byte)[0]

        # ASCII bin representation
        binascii = (bin(int(unpacked)))[2:].zfill(8)

        # ASCII hex representation
        hexascii = "0x" + hex(unpacked)[2:].zfill(2)

        print(" |-----[%s]  |  %s" % (binascii, hexascii))

And voila! Our csv gets decoded and we can work on reverse engineer the protocol.

The full code as well as some example captures is hosted on github: https://github.com/n0w/acreversing