41#ifndef MSG_QUE_BLOCK_H_INCLUDED
42#define MSG_QUE_BLOCK_H_INCLUDED
53#include "memutils/common_utils/common_types.h"
54#include "memutils/common_utils/common_assert.h"
55#include "memutils/os_utils/chateau_osal.h"
56#include "memutils/message/cache.h"
57#include "memutils/message/MsgQue.h"
58#include "memutils/message/MsgLog.h"
60#include "SpinLockManager.h"
75 typedef InterCpuLock::SpinLockId SpinLockId;
77 typedef uint16_t SpinLockId;
94 MsgCpuId getOwner()
const {
return m_owner; }
98 bool isShare()
const {
return m_spinlock != 0; }
102 uint16_t getNumMsg(MsgPri pri)
const;
108 uint16_t getRest(MsgPri pri)
const;
116 Chateau_DeleteSemaphore(m_count_sem);
117 Chateau_CreateSemaphore(&m_count_sem, 0, 0);
123 void dumpQue(MsgPri pri)
const { m_que[pri].dump(); }
130 MsgQueBlock(MsgQueId
id, MsgCpuId owner, SpinLockId spinlock);
134 err_t setup(drm_t n_drm, uint16_t n_size, uint16_t n_num,
135 drm_t h_drm, uint16_t h_size, uint16_t h_num);
139 uint16_t getElemSize(MsgPri pri)
const {
return m_que[pri].elem_size(); }
144 bool isOwn()
const {
return GET_CPU_ID() == getOwner(); }
148 bool isOwn()
const {
return true; }
153 static size_t getSendSize(
const T& param,
bool type_check);
158 err_t send(MsgPri pri, MsgType type, MsgQueId reply, MsgFlags flags,
const T& param);
165 err_t sendIsr(MsgPri pri, MsgType type, MsgQueId reply,
const T& param);
171 void notifySend(MsgCpuId cpu, MsgQueId dest);
189 uint32_t total_pending;
190 uint16_t max_pending;
191 uint16_t max_queuing[NumMsgPri];
194 void clear() { memset(
this, 0x00,
sizeof(*
this)); }
196 printf(
"tally: total_pending=%ld, max_pending=%d, max_queuing=%d, %d\n",
197 total_pending, max_pending, max_queuing[MsgPriNormal], max_queuing[MsgPriHigh]);
204 const MsgCpuId m_owner;
205 Chateau_sem_handle_t m_count_sem;
208 const SpinLockId m_spinlock;
211 uint16_t m_pendingMsgCount;
223inline MsgQueBlock::MsgQueBlock(MsgQueId
id, MsgCpuId owner, SpinLockId spinlock) :
228 m_spinlock(spinlock),
229 m_pendingMsgCount(0),
233 m_count_sem.semcount = 0;
239inline err_t MsgQueBlock::setup(drm_t n_drm, uint16_t n_size, uint16_t n_num,
240 drm_t h_drm, uint16_t h_size, uint16_t h_num)
244 if (m_initDone !=
false)
251 m_que[MsgPriNormal].init(n_drm, n_size, n_num);
252 if (h_drm != INVALID_DRM)
254 m_que[MsgPriHigh].init(h_drm, h_size, h_num);
261 Dcache_clear(DRM_TO_CACHED_VA(n_drm), n_size * n_num);
262 if (h_drm != INVALID_DRM)
264 Dcache_clear(DRM_TO_CACHED_VA(h_drm), h_size * h_num);
270 Chateau_CreateSemaphore(&m_count_sem, 0, 0);
273 Dcache_flush(
this,
sizeof(*
this));
283inline uint16_t MsgQueBlock::getNumMsg(MsgPri pri)
const
285 D_ASSERT2(pri == MsgPriNormal || pri == MsgPriHigh,
AssertParamLog(AssertIdBadParam, pri));
292 Dcache_clear_sync(
this,
sizeof(*
this));
294 return m_que[pri].size();
301inline uint16_t MsgQueBlock::getRest(MsgPri pri)
const
303 D_ASSERT2(pri == MsgPriNormal || pri == MsgPriHigh,
AssertParamLog(AssertIdBadParam, pri));
310 Dcache_clear_sync(
this,
sizeof(*
this));
312 return m_que[pri].rest();
321size_t MsgQueBlock::getSendSize(
const T& ,
bool type_check)
330inline size_t MsgQueBlock::getSendSize<MsgNullParam>(
const MsgNullParam& ,
bool )
338inline size_t MsgQueBlock::getSendSize<MsgRangedParam>(
const MsgRangedParam& param,
bool )
349 static const bool typed_param =
true;
350 static const bool null_param =
false;
355 static const bool typed_param =
false;
356 static const bool null_param =
true;
361 static const bool typed_param =
false;
362 static const bool null_param =
false;
369err_t MsgQueBlock::send(MsgPri pri, MsgType type, MsgQueId reply, MsgFlags flags,
const T& param)
374 size_t send_size = getSendSize(param, type_check);
375 if (send_size > getElemSize(pri))
377 return ERR_DATA_SIZE;
397 Dcache_flush_clear(msg, MEMUTILS_ROUND_UP(
sizeof(
MsgPacketHeader), CACHE_BLOCK_SIZE));
410 msg->setParam(param, type_check);
414 DUMP_MSG_SEQ_LOCK(
MsgSeqLog(
's', m_id, pri, m_que[pri].size(), msg));
423 Dcache_flush_clear_sync(msg, MEMUTILS_ROUND_UP(send_size, CACHE_BLOCK_SIZE));
426 if (isShare() ==
false || isOwn())
430 Chateau_SignalSemaphoreTask(m_count_sem);
436 notifySend(m_owner, m_id);
447 return (msg) ? ERR_OK : ERR_QUE_FULL;
455err_t MsgQueBlock::sendIsr(MsgPri pri, MsgType type, MsgQueId reply,
const T& param)
459 D_ASSERT2(isShare() ==
false,
AssertParamLog(AssertIdBadMsgQueState, m_id));
464 if (getSendSize(param, type_check) > getElemSize(pri))
466 return ERR_DATA_SIZE;
478 msg->setParam(param, type_check);
482 Chateau_SignalSemaphoreIsr(m_count_sem);
483 DUMP_MSG_SEQ(
MsgSeqLog(
'i', m_id, pri, m_que[pri].size(), msg));
485 return (msg) ? ERR_OK : ERR_QUE_FULL;
494 D_ASSERT2(pri == MsgPriNormal || pri == MsgPriHigh,
AssertParamLog(AssertIdBadParam, pri));
497 D_ASSERT2(m_que[pri].is_init(),
AssertParamLog(AssertIdBadMsgQueState, m_id, pri));
499 D_ASSERT2(isOwn() || (isShare() && InterCpuLock::SpinLockManager::isMember(m_spinlock)),
505 MsgPacket* msg = m_que[pri].pushHeader(header);
507 if (m_que[pri].size() > m_tally.max_queuing[pri]) {
508 m_tally.max_queuing[pri] = m_que[pri].size();
511 DUMP_MSG_PEAK(m_id, pri,
MsgPeakLog(m_tally.max_queuing[pri], msg, m_que[pri].frontMsg()));
520inline void MsgQueBlock::notifyRecv() {
521 Chateau_SignalSemaphoreIsr(m_count_sem);
535 if (!(isOwn() && m_cur_que == NULL))
542 if (ms != TIME_FOREVER)
545 tm.tv_sec = ms / 1000;
546 tm.tv_nsec = ms % 1000;
547 result = Chateau_TimedWaitSemaphore(m_count_sem, tm);
551 result = Chateau_WaitSemaphore(m_count_sem);
570 MsgPri pri = (m_que[MsgPriHigh].size()) ? MsgPriHigh : MsgPriNormal;
571 MsgQue* que = &m_que[pri];
583 if (msg->getFlags() & MsgPacket::MsgFlagWaitParam)
595 m_tally.max_pending = MAX(m_pendingMsgCount, m_tally.max_pending);
596 ++m_tally.total_pending;
605 if (msg->getSrcCpu() != GET_CPU_ID())
607 Dcache_clear(msg, que->elem_size());
619 uint16_t pending = m_pendingMsgCount;
620 m_pendingMsgCount = 0;
636 Chateau_SignalSemaphoreTask(m_count_sem);
639 DUMP_MSG_SEQ_LOCK(
MsgSeqLog(
'r', m_id, pri, m_que[pri].size(), msg));
649inline err_t MsgQueBlock::pop()
653 if (!(isOwn() && m_cur_que != NULL))
664 if (msg->getParamSize() != 0)
673 if (m_cur_que->pop() ==
false)
684#if MSG_FILL_VALUE_AFTER_POP == 0x00
687 Dcache_clear(msg, m_cur_que->elem_size());
692 Dcache_flush_clear(msg, m_cur_que->elem_size());
704inline void MsgQueBlock::lock() {
705 if (isShare() ==
false) {
706 Chateau_LockInterrupt(&m_context);
709 InterCpuLock::SpinLockManager::acquire(m_spinlock);
712 Dcache_clear_sync(
this,
sizeof(*
this));
722inline void MsgQueBlock::unlock() {
723 if (isShare() ==
false) {
724 Chateau_UnlockInterrupt(&m_context);
729 Dcache_flush_clear_sync(
this,
sizeof(*
this));
730 InterCpuLock::SpinLockManager::release(m_spinlock);
744inline void MsgQueBlock::dump()
const
746 printf(
"ID:%d, init=%d, owner=%d, spinlock=%d, count_sem=%d, cur_pending=%d, cur_que=%p\n",
747 m_id, m_initDone, m_owner, m_spinlock, m_count_sem.semcount, m_pendingMsgCount, m_cur_que);
750 printf(
"Normal priority queue=%p\n", &m_que[MsgPriNormal]);
751 m_que[MsgPriNormal].dump();
753 if (m_que[MsgPriHigh].capacity()) {
754 printf(
"High priority queue=%p\n", &m_que[MsgPriHigh]);
755 m_que[MsgPriHigh].dump();
Definition: cpp_util.h:45
Message Library Class.
Definition: Message.h:88
Definition: MsgPacket.h:119
Definition: MsgPacket.h:142
Message Queue Class.
Definition: MsgQueBlock.h:72
Definition: MsgPacket.h:124
Definition: type_holder.h:122
err_t recv(uint32_t ms, FAR MsgPacket **packet)
Definition: MsgQueBlock.h:527
Definition: AssertInfo.h:210
Definition: MsgQueBlock.h:348
Definition: MsgQueBlock.h:188