I’ve been doing embedded work for so long now that it’s rare that I come across a need that I haven’t run into before. Well, it happened the other day, so I thought I’d share it with you.

Here’s the situation. I have a transducer whose role is to determine a binary condition (essentially X is present / absent). The transducer is AC coupled and pass band filtered such that what I’m looking for is the presence of a noisy AC signal. My first inclination was to take the pass-band filtered signal, rectify it and average it over time. While this worked reasonably well, the delta between X being present / absent was not as large as I’d have liked. Accordingly I investigated other metrics. Somewhat to my surprise I found that the peak signal values (i.e peak X present : peak X absent) gave a far better ratio. Thus I found myself needing to find the peak (i.e. maximum value) of a moving time series of data values.

To prove the concept, I used a brute force approach. That is, every time I received a new reading I’d store it in a buffer, the size of which was determined by the amount of time I wished to look back over. I’d then search the entire buffer looking for the maximum value. This approach of course worked fine – but it was hopelessly inefficient. Given that I am receiving a new value to process every few milliseconds, this inefficiency is unacceptable in the real product. Accordingly, I needed a better algorithm.

My first port of call was of course the internet. If you do a search for peak detector algorithms then you’ll find a plethora of algorithms. However they are all way too sophisticated for what I need, as they are aimed at finding *all* the local maxima within an N-Dimensional array (apparently an important problem in physics, image processing etc). I just want to know what’s the maximum value in a buffer of values – and I want to know it as efficiently as possible.

After pondering the problem for awhile, my thoughts first turned to an efficient median filtering algorithm. I discussed median filtering here. My rationale was quite simple. The efficient median filtering algorithm inherently keeps the time series data sorted and so I should be able to easily adapt it to return a maximum value rather than a median value. Well it turned out to be quite trivial. Indeed it’s so trivial that I wonder why I haven’t modified it before. Here’s the code (note, as explained in my original posting on median filtering, this algorithm and code is mostly courtesy of Phil Ekstrom):

#define STOPPER 0 /* Smaller than any datum */ #define MEDIAN_FILTER_SIZE (31) void median_filter(uint16_t datum, uint16_t *calc_median, uint16_t *calc_peak) { struct pair { struct pair *point; /* Pointers forming list linked in sorted order */ uint16_t value; /* Values to sort */ }; static struct pair buffer[MEDIAN_FILTER_SIZE] = {0}; /* Buffer of nwidth pairs */ static struct pair *datpoint = buffer; /* Pointer into circular buffer of data */ static struct pair small = {NULL, STOPPER}; /* Chain stopper */ static struct pair big = {&small, 0}; /* Pointer to head (largest) of linked list.*/ struct pair *successor; /* Pointer to successor of replaced data item */ struct pair *scan; /* Pointer used to scan down the sorted list */ struct pair *scanold; /* Previous value of scan */ struct pair *median; /* Pointer to median */ uint16_t i; if (datum == STOPPER) { datum = STOPPER + 1; /* No stoppers allowed. */ } if ( (++datpoint - buffer) >= MEDIAN_FILTER_SIZE) { datpoint = buffer; /* Increment and wrap data in pointer.*/ } datpoint->value = datum; /* Copy in new datum */ successor = datpoint->point; /* Save pointer to old value's successor */ median = &big; /* Median initially to first in chain */ scanold = NULL; /* Scanold initially null. */ scan = &big; /* Points to pointer to first (largest) datum in chain */ /* Handle chain-out of first item in chain as special case */ if (scan->point == datpoint) { scan->point = successor; } scanold = scan; /* Save this pointer and */ scan = scan->point ; /* step down chain */ /* Loop through the chain, normal loop exit via break. */ for (i = 0 ; i < MEDIAN_FILTER_SIZE; ++i) { /* Handle odd-numbered item in chain */ if (scan->point == datpoint) { scan->point = successor; /* Chain out the old datum.*/ } if (scan->value < datum) /* If datum is larger than scanned value,*/ { datpoint->point = scanold->point; /* Chain it in here. */ scanold->point = datpoint; /* Mark it chained in. */ datum = STOPPER; }; /* Step median pointer down chain after doing odd-numbered element */ median = median->point; /* Step median pointer. */ if (scan == &small) { break; /* Break at end of chain */ } scanold = scan; /* Save this pointer and */ scan = scan->point; /* step down chain */ /* Handle even-numbered item in chain. */ if (scan->point == datpoint) { scan->point = successor; } if (scan->value < datum) { datpoint->point = scanold->point; scanold->point = datpoint; datum = STOPPER; } if (scan == &small) { break; } scanold = scan; scan = scan->point; } *calc_median = median->value; *calc_peak = big.point->value; }

To use this code, simply set the MEDIAN_FILTER_SIZE to your desired value, and then call median_filter every time you have a new value to process. The advantage of this code is that it gives you two parameters – the median and the maximum.

However, when I bench marked it against the brute force approach I discovered that the brute force algorithm was actually faster – by a reasonable amount. After reflecting upon this for awhile I realized that the median filtering approach was of course keeping all of the data sorted – which was way more than I needed for a simple peak detector. Thus it was time for another approach.

After thinking about it for awhile, it was clear that the only datum I cared about in my buffer was the current maximum value. Thus upon receipt of a new value, I essentially had to decide:

- Is this new value >= current maximum value? If it is then it’s the new maximum value. Note that I really do mean >= and not >. The reason is explained below.
- Otherwise, is this new value about to overwrite the existing maximum value? If so, overwrite it and use a brute force search to find the new maximum value.

In order to make the algorithm as efficient as possible we need to minimize the number of times a brute force search is required. Now if your buffer contains only one incidence of the maximum value, then you’ll simply have to do the brute force search when you replace the maximum value. However, if your buffer contains multiple copies of the maximum value, then you’ll minimize the number of brute force searches by declaring the operative maximum value as the one furthest away from where we will write to in the buffer next. It’s for this reason that we use a >= comparison in step 1.

In a similar vein, if we have to do a brute force search then if there are multiple copies of the maximum value then once again we want to choose the maximum value furthest away from where we will write to in the buffer next.

Anyway, here’s the code

#define WINDOW_SIZE 31 uint16_t efficient_peak_detector(uint16_t value) { static uint16_t wbuffer[WINDOW_SIZE] = {0U}; static uint16_t wr_idx = 0U; static uint16_t peak_value = 0U; static uint16_t peak_value_idx = 0U; if (value >= peak_value) { peak_value = value; /* New peak value, so record it */ peak_value_idx = wr_idx; wbuffer[wr_idx] = value; } else { wbuffer[wr_idx] = value; /* Not a new peak value, so just store it */ if (wr_idx == peak_value_idx) /* Have we over written the peak value ? */ { /* Yes, so we need to do a brute force search to find the new maximum. Note that for efficiency reasons, if there are multiple values of the new peak value, then we want to chose the one whose index value is as far away as possible from the current index */ uint16_t idx; uint16_t cnt; for (cnt = 0U, idx = wr_idx, peak_value = 0U; cnt < WINDOW_SIZE; ++cnt) { if (wbuffer[idx] >= peak_value) { peak_value = wbuffer[idx]; /* Record new peak */ peak_value_idx = idx; } if (++idx >= WINDOW_SIZE) { idx = 0; } } } } if (++wr_idx >= WINDOW_SIZE) { wr_idx = 0; } return peak_value; }

Obviously, if you make your buffer size a power of two then you can optimize the buffer wrapping code. There are a couple of other minor optimizations that can be made. For example on eight bit processors, making the various indices and counters (wr_idx, peak_value_idx, idx, cnt) 8 bits will speed things up a lot. Similarly on a 32 bit processor you will likely gain a bit by using 32 bit values.

Notwithstanding the aforementioned optimizations, this code is on average considerably faster than the brute force approach and not much larger. Its main limitation is that its run time is highly variable depending upon whether we have to determine a new maximum value using the brute force approach or not.

Clearly it’s trivial to modify this code to perform a ‘trough detector’ (aka minimum detector), or indeed have it do both functions.

<I’ve noticed that comments are shown as closed. I’m not sure why this is, but am working with the system administrator to get it fixed. Apologies>