Помощь - Поиск - Пользователи - Календарь
Полная версия этой страницы: Проблема с Ring Buffer.
Форум разработчиков электроники ELECTRONIX.ru > Сайт и форум > В помощь начинающему > Программирование
Jenya7
Я сделал такую реализацию.
CODE
typedef enum CycBuffState {cbsOk = 0, cbsOverwrite = 1, cbsBuffFull = 2, cbsBuffEmpty = 3, cbsError = 4} enCycBuffState;

typedef struct
{
uint32_t MaxBufSize,
HeadIdx,
TailIdx;
enCycBuffState State;
uint8_t *Buffer;
} stRingBuffer;

void RingBuf_Init (stRingBuffer *ring_buf, uint8_t *buf, uint32_t max_size)
{
ring_buf->Buffer = buf;
ring_buf->MaxBufSize = max_size;
ring_buf->HeadIdx = ring_buf->TailIdx = 0;
ring_buf->State = cbsBuffEmpty;
}

enCycBuffState RingBuf_PushByte(stRingBuffer *ring_buf, uint8_t val, uint8_t allow_overwrite)
{
if ((ring_buf->HeadIdx+1) == ring_buf->TailIdx)
{
if (!allow_overwrite)
return cbsBuffFull;
}

ring_buf->Buffer[ring_buf->HeadIdx] = val;
ring_buf->HeadIdx++;

if (ring_buf->HeadIdx >= ring_buf->MaxBufSize)
ring_buf->HeadIdx = 0;

return cbsOk;
}

enCycBuffState RingBuf_PushBytes(stRingBuffer *ring_buf, uint8_t len, uint8_t *buf, uint8_t allow_overwrite)
{
uint32_t free_space;

if (ring_buf->HeadIdx == ring_buf->TailIdx)
{
free_space = ring_buf->MaxBufSize;
}
else
{
if (ring_buf->HeadIdx >= ring_buf->TailIdx)
free_space = ring_buf->HeadIdx - ring_buf->TailIdx;
else
free_space = (ring_buf->MaxBufSize - ring_buf->TailIdx) + ring_buf->HeadIdx;

if (free_space < len)
return cbsBuffFull;
}
while (len)
{
RingBuf_PushByte(ring_buf, *buf, allow_overwrite);
buf++;
len--;
}

return cbsOk;
}

enCycBuffState RingBuf_PopByte(stRingBuffer *ring_buf, uint8_t *val)
{
if (ring_buf->HeadIdx == ring_buf->TailIdx)
return cbsError;

*val = ring_buf->Buffer[ring_buf->TailIdx];
ring_buf->Buffer[ring_buf->TailIdx] = 0;
ring_buf->TailIdx++;

if (ring_buf->TailIdx >= ring_buf->MaxBufSize)
ring_buf->TailIdx = 0;

return cbsOk;
}

enCycBuffState RingBuf_PopBytes(stRingBuffer *ring_buf, uint8_t len, uint8_t *val)
{
while (len)
{
RingBuf_PopByte(ring_buf, val);
val++;
len--;
}

return cbsOk;
}


И потом проверяю (в IAR, в симуляторе)
Код
#include <string.h>
#include "ringbuf.h"

stRingBuffer ring_buf;

uint8_t *temp_buff;
uint8_t *test_buff = "Hello World";
uint8_t *read_buff;

int main()
{
  RingBuf_Init (&ring_buf, temp_buff, 32);
  while (1)
  {
     RingBuf_PushBytes(&ring_buf, strlen(test_buff), test_buff, 0);
    
     RingBuf_PopBytes(&ring_buf, strlen(test_buff), read_buff);
  }
}

Но несмотря на то что я отслеживаю наличие свободного места.
Код
uint32_t free_space;
  
  if (ring_buf->HeadIdx == ring_buf->TailIdx)
  {
    free_space = ring_buf->MaxBufSize;
  }
  else
  {
    if (ring_buf->HeadIdx >= ring_buf->TailIdx)
       free_space = ring_buf->HeadIdx - ring_buf->TailIdx;
     else
       free_space = (ring_buf->MaxBufSize - ring_buf->TailIdx) + ring_buf->HeadIdx;
  
    if (free_space < len)
      return cbsBuffFull;

В отладчике я вижу перезапись буфера. Что я упустил?
smalcom
Много чего, там и переполнение и... проще показать. Вот вам пример снизу, немного адаптировал из другой программы. Самая высокоуровневая функция - это Write.

CODE
uint32_t BufferFreeSize()
{
if(RdIdx > WrIdx)
return RdIdx - WrIdx;
else
return (BUFFER_SIZE - WrIdx) + RdIdx;
}

bool CheckOverEndIdx(uint32_t& pIdx)
{
if(pIdx > BUFFER_SIZE)
{
pIdx -= (BUFFER_SIZE - 1);

return true;
}
else
{
return false;
}
}

void PushBytes_Internal(const uint8_t* const pDataBlock, const uint16_t pBlockSize)
{
uint16_t bw = 0;// количество записанных байт
uint32_t TempWrIdx = WrIdx;// берём текущее значение индекса записи
Service::RetVal rv;

do
{
uint16_t btw;// количество байт, которые необходимо записать

CheckOverEndIdx(TempWrIdx);
if((TempWrIdx + pBlockSize - bw) > BUFFER_SIZE)
btw = BUFFER_SIZE - TempWrIdx + 1;
else
btw = pBlockSize - bw;

for(uint32_t idx = TempWrIdx, idx_end = idx + btw; idx < idx_end; idx++) BufferData[idx] = pDataBlock[bw++];

TempWrIdx += btw;
CheckOverEndIdx(TempWrIdx);
} while(bw < pBlockSize);

WrIdx = TempWrIdx;
}

bool Write(const uint8_t* const pDataBlock, const uint16_t pBlockSize)
{
if(pBlockSize > BUFFER_SIZE) return false;

if(BufferFreeSize() < (pBlockSize + 1))
{
RdIdx += pBlockSize;
CheckOverEndIdx(RdIdx);
}

// записываем блок данных
PushBytes_Internal(WrIdx, mNPQPack, pBlockSize, &WrIdx);

return false;
}
Jenya7
Цитата(smalcom @ Aug 31 2016, 15:17) *
Много чего, там и переполнение и... проще показать. Вот вам пример снизу, немного адаптировал из другой программы. Самая высокоуровневая функция - это Write.

Но вы все таки разрешаете запись, освобождая место. А я думаю блокировать запись пока данные не будут считаны.
XVR
Код
uint8_t *temp_buff;
uint8_t *read_buff;
Ну и куда они в результате смотрят?
Сделайте так -
Код
uint8_t temp_buff[32];
uint8_t read_buff[32];

Jenya7
Цитата(XVR @ Aug 31 2016, 18:41) *
Код
uint8_t *temp_buff;
uint8_t *read_buff;
Ну и куда они в результате смотрят?
Сделайте так -
Код
uint8_t temp_buff[32];
uint8_t read_buff[32];

да. спасибо. указатели нужно конечно инициализировать.
smalcom
Цитата(Jenya7 @ Aug 31 2016, 13:23) *
Но вы все таки разрешаете запись, освобождая место. А я думаю блокировать запись пока данные не будут считаны.


Цитата
if(BufferFreeSize() < (pBlockSize + 1))
{
RdIdx += pBlockSize;
CheckOverEndIdx(RdIdx);
}


заменить на

Цитата
if(BufferFreeSize() < (pBlockSize + 1)) return;
Jenya7
Цитата(smalcom @ Aug 31 2016, 20:14) *
заменить на

спасибо.
DASM
Чет мудреные какие-то реализации. Так не проще?
CODE
template <class T>
class RingBuf
{
public:
RingBuf (unsigned int size) : buf(new T[size]), head(0), sz_contains(0), SZ_MAX(size){};
~RingBuf() {
delete [] buf;
}
bool Pop (T &val)
{
if (sz_contains == 0)
return false;
else
{
val = buf[head++]; head %= SZ_MAX; sz_contains--;
return true;
}
}
bool Push (T val)
{
if (!GetFree())
return false;
else
{
buf[(head + sz_contains++) % SZ_MAX] = val;
return true;
}
}
bool Push (T val[] , unsigned int count)
{
if (count > GetFree())
return false;
for (unsigned int i = 0; i < count; i++)
Push(val[i]);
return true;
}
void Clear() {sz_contains = 0;}
unsigned int GetFree () {return SZ_MAX - sz_contains;}

private:
unsigned int sz_contains;
const unsigned int SZ_MAX;
T *buf;
unsigned int head;
};
Jenya7
Цитата(DASM @ Aug 31 2016, 20:53) *
Чет мудреные какие-то реализации. Так не проще?

Нет проверки влезет ли записываемый кусок в свободное место.
DASM
Цитата(Jenya7 @ Aug 31 2016, 18:05) *
Нет проверки влезет ли записываемый кусок в свободное место.

У меня поэлементый метод только накидал, счас блок сделаем
Вот
CODE
bool Push (T val[] , unsigned int count)
{
if (count > GetFree())
return false;
for (unsigned int i = 0; i < count; i++)
{
Push(val[i]);
}
return true;
}

Использование тривиально
CODE
const int SZ = 4;
RingBuf<int> r(SZ);
int ar[] = {1,2,3};
bool mres = r.Push(ar, sizeof(ar)/sizeof(ar[0])); // несколько элементов
printf ("push %s \n", mres ? "OK ": "FULL ");
r.Push(val); // один элемент

Pop для нескольких элементов - аналогично
smalcom
Код
buf[(head + sz_contains++) % SZ_MAX] = val;

угроза переполнения "sz_contains".
DASM
Цитата(smalcom @ Aug 31 2016, 21:03) *
Код
buf[(head + sz_contains++) % SZ_MAX] = val;

угроза переполнения "sz_contains".

Откуда ? После 2^32 push??
smalcom
Цитата
После 2^32 push?

я так на волне паранойи и подумал. Но при внимательном рассмотрении - SZ_MAX имеет такой же тип, что и sz_contains.

brag
Хм. Зачем все так сложно мудрить, если есть уже давно простейшая рабочая реализация прописанная в букварях по программированию, остается ее только портировать на свой язык.
При чем тут еще от задачи зависит. Если это SPSC, тогда вообще все просто и без блокировок. Если MPSC/MPMC/SPMC - нужны уже блокировки.
Приведу простейший пример SPSC, написан был мной лет 8 назад(то есть просто реализован на ц++ алгоритм из букваря) и очень часто используется по сей день везде(драйверы, очередя сообщений, итд итп)
CODE

template<class T, int Size> class Fifo_spsc{
public:
static_assert((Size&(Size-1)) == 0, "Size must be power of 2");
Fifo_spsc(){
rx = 0;
wx = 0;
}

// Producer
bool push(const T &v){
uint32_t w = (wx+1)&(Size-1);
if(w==rx)return false; //ERR_FIFO_FULL;
data[wx] = v;
__DMB(); // just a memory barrier - ensure data has been completely stored
wx = w; // make item available for reading
return true;
}

// Consumer
bool pop(T *v){
uint32_t r = rx;
if(r==wx)return false; //ERR_FIFO_EMPTY;
*v = data[r];
__DMB(); // ensure data has been completely loaded
rx = (r+1)&(Size-1); // make item available for writing
return true;
}

// Consumer
void flush(){
__DMB();
rx = wx;
}

protected:
uint16_t rx, wx;
T data[Size];
};

Памяти занимает - собственно сами данные + 2 16-битных слова.
Хоть и c++, что позволяет использовать этот буфер для любых типов - как простого uint_8t, так и для сложных обьектов. Это базовая реализация для практически всех других более сложных очередей.
Один элемент всегда пустой - это делает код гораздо проще и быстрее, и как ни странно зачастую - меньше требовательным к памяти sm.gif

Также, если у нас строго один писатель и один читатель, то можно сделать и чтение не по одному элементу, а сразу массивом.
CODE

// Consumer, use with caution
const T* getConsecutiveData_ptr()const{
if(rx==wx)return 0; // empty
return &data[rx];
}

int getConsecutiveData_len()const{
if(wx>rx)return wx-rx;
else return Size-rx;
}

void popConsecutiveData(unsigned n){
__DMB();
rx = (rx+n)&(Size-1);
}

Важно понимать, что связь между producer и consumer должна быть только через функции этого буфера, без каких либо других переменных, вызовов функций итд в обход этого FIFO. Иначе однозначно рано или поздно схватите гонку.
То есть он подходит, например когда producer - это прерывание, а consumer - вечный цикл. Что для большинства задач достаточно.

Если у нас еvent-driven-движок (то есть без вечных циклов, все на событиях) - тогда нужна более сложная реализация. Но использовать нужно опять же только ее, без всяких сторонних связей.
Если интересно - расскажу и покажу как, я на этих циклических буферах(в том числе с обьектами не фиксированного размера) уже собаку сьел за много лет, и в своих программах использую только их - как самый простой, эффективный и безопасный интерфейс для связи между обьектами.

Но это все C++, зато позволяет эффективно и безопасно работать и писать очень простой код.
Например. Прерывание uart
Код
void U0RxCbk::readyRead(LPC_UART_TypeDef *uart){ // interrupt
    while(Uart::isCharAvailable(uart)){
        bool r = rxfifo.push(Uart::read_char(uart));
        if(!r){
            printf("U0RxCbk::readyRead(): rxfifo full!\n");
        }
    }
}

Приемник - просто функция, которая будет вызвана, когда на это появится время(в порядке приоритета и очереди).
Которая в свою очередь принимает текст NMEA и дальше парсит его.
Код
void Gps::start_receiver(){
    // Устанавливаем обработчик события
    rxfifo.set_onReadyRead([this](){ // собственно сам обработчик
        uint8_t ch;
        while(true){
            bool r = rxfifo.pop(&ch); // читаем из очереди
            parse(ch); // парсим
            if(!r)break; // если очередь стала пустой - выходим
        }
    }, GPS_TASK_PRIORITY); // приоритет обработчика
}


Это более старый вариант, привел, чтобы понять как оно работает, скажем так, внутри.
Сейчас я пользуюсь в основном таким:
Код
class GpsReceiver : public UmFifoSingleReceiver<uint8_t, GpsReceiver, GPS_TASK_PRIORITY>{
protected:
    void onItemReceived(uint8_t ch){
        parse(ch);
    }
};

То есть вся логика чтения из буфера реализована один раз отдельно и используется повторно сколько угодно раз.
Так же есть аналогичные реализации, когда приемник может обрабатывать не один элемент - а массив из нескольких(например отправка через DMA), физически - подряд лежащих в нашем ring-buffere.
Overhed-a нет, наоборот такая реализация на шаблонах работает быстрее, чем обычные сишные sm.gif Большинство работы делает компилятор, там даже вызова функции не будет - компилятор функцию parse встроит в недра самой очереди.
smalcom
самое первое (а там и дальше так)
Цитата
uint32_t w = (wx+1)&(Size-1);

это не кольцевой буфер, а обычный.
brag
Цитата(smalcom @ Sep 7 2016, 20:47) *
это не кольцевой буфер.

А какой?
Или Вас смущает &(Size-1) ? Ну так в данной реализации все заточено под скорость, ессно Size должен быть степенью двойки. В коде также стоит проверка, компилятор всегда проверяет является он степенью двойки или нет:
Код
static_assert((Size&(Size-1)) == 0, "Size must be power of 2");


Иначе придется, как минимум добавлять if, а если совсем все плохо - % (остаток от деления).
smalcom
о, пардон.
странно как-то - следующей строки с проверкой я не увидел. пора в отпуск.
brag
Цитата(smalcom @ Sep 8 2016, 04:43) *
о, пардон.
странно как-то - следующей строки с проверкой я не увидел. пора в отпуск.

Да, действительно, прошу прощения, при вставке кода в браузер и форматировании строка куда-то пропала. исправил.
Jenya7
Цитата(brag @ Sep 7 2016, 13:48) *
Хм. Зачем все так сложно мудрить.....

лично для меня мудрить это
Код
template<class T, int Size> class Fifo_spsc

sm.gif
мне бы по простому. по сишному. sm.gif
brag
Цитата(Jenya7 @ Sep 11 2016, 20:58) *
мне бы по простому. по сишному

переписывать под каждый тип данных все функции работы с очередью
Для просмотра полной версии этой страницы, пожалуйста, пройдите по ссылке.
Invision Power Board © 2001-2025 Invision Power Services, Inc.