mrfioc2  2.3.0
bufrxmgr.cpp
Go to the documentation of this file.
1 /*************************************************************************\
2 * Copyright (c) 2010 Brookhaven Science Associates, as Operator of
3 * Brookhaven National Laboratory.
4 * Copyright (c) 2015 Paul Scherrer Institute (PSI), Villigen, Switzerland
5 * mrfioc2 is distributed subject to a Software License Agreement found
6 * in file LICENSE that is included with this distribution.
7 \*************************************************************************/
8 /*
9  * Author: Michael Davidsaver <mdavidsaver@gmail.com>
10  */
11 
12 
13 
14 #include <cstdlib>
15 #include <cstdio>
16 #include <stdexcept>
17 
18 #include <errlog.h>
19 #include <epicsTypes.h>
20 #include <cantProceed.h>
21 #include <dbDefs.h>
22 #include "mrf/databuf.h"
23 
24 
25 #include <epicsExport.h>
26 #include "bufrxmgr.h"
27 
28 /* Subtract member byte offset, returning pointer to parent object */
29 #ifndef CONTAINER
30 # ifdef __GNUC__
31 # define CONTAINER(ptr, structure, member) ({ \
32  const __typeof(((structure*)0)->member) *_ptr = (ptr); \
33  (structure*)((char*)_ptr - offsetof(structure, member)); \
34  })
35 # else
36 # define CONTAINER(ptr, structure, member) \
37  ((structure*)((char*)(ptr) - offsetof(structure, member)))
38 # endif
39 #endif
40 
41 #define CBINIT(ptr, prio, fn, valptr) \
42 do { \
43  callbackSetPriority(prio, ptr); \
44  callbackSetCallback(fn, ptr); \
45  callbackSetUser(valptr, ptr); \
46  (ptr)->timer=NULL; \
47 } while(0)
48 
49 extern int evrMrmSeqRxDebug;
50 
51 static
52 void defaulterr(void *, epicsStatus err,
53  epicsUInt32 , const epicsUInt8* )
54 {
55  switch(err) {
56  case 0: break; // no error
57  case 1: errlogPrintf("Data buffer Rx rate too high, dropped buffers\n"); break;
58  case 2: errlogPrintf("Data buffer Rx checksum error\n"); break;
59  default: errlogPrintf("Data buffer Rx error %d\n",err);
60  }
61 }
62 
63 bufRxManager::bufRxManager(const std::string& n, unsigned int qdepth, unsigned int bsize)
64  :dataBufRx(n)
65  ,guard()
66  ,onerror(defaulterr)
67  ,onerror_arg(NULL)
68  ,m_bsize(bsize ? bsize : 2048)
69 {
70  ellInit(&dispatch);
71  ellInit(&freebufs);
72  ellInit(&usedbufs);
73 
74  CBINIT(&received_cb, priorityMedium, &bufRxManager::received, this);
75 
76  for(unsigned int i=0; i<qdepth; i++) {
77  buffer *t=(buffer*)callocMustSucceed(1, sizeof(buffer)-1+m_bsize, "bufRxManager buffer");
78  ellAdd(&freebufs, &t->node);
79  }
80 }
81 
83 {
84  ELLNODE *node, *next;
85 
86  SCOPED_LOCK(guard);
87 
88  for(node=ellFirst(&freebufs), next=node ? ellNext(node):NULL;
89  node;
90  node=next, next=next ? ellNext(next) : NULL)
91  {
92  buffer *b=CONTAINER(node, buffer, node);
93  free(b);
94  }
95 }
96 
97 epicsUInt8*
98 bufRxManager::getFree(unsigned int* blen)
99 {
100  ELLNODE *node;
101  {
102  SCOPED_LOCK(guard);
103  node=ellGet(&freebufs);
104  }
105 
106  if (!node)
107  return NULL;
108 
109  buffer *buf=CONTAINER(node, buffer, node);
110 
111  if (blen) *blen=bsize();
112 
113  buf->used=0;
114  return buf->data;
115 }
116 
117 void
118 bufRxManager::receive(epicsUInt8* raw,unsigned int usedlen)
119 {
120  /* CONTAINER doesn't work when the member is a pointer
121  * because the GNU version's check isn't correct
122  buffer *buf=CONTAINER(raw, buffer, data);
123  */
124  buffer *buf=(buffer*)((char*)(raw) - offsetof(buffer, data));
125 
126  if (usedlen>bsize())
127  throw std::out_of_range("User admitted overflowing Rx buffer");
128  buf->used=usedlen;
129 
130  if (usedlen==0) {
131  // buffer returned w/o being used
132  {
133  SCOPED_LOCK(guard);
134  ellAdd(&freebufs, &buf->node);
135  }
136  if(evrMrmSeqRxDebug>=2) {
137  errlogPrintf("buffer ignored\n");
138  }
139  return;
140  }
141 
142  {
143  SCOPED_LOCK(guard);
144  ellAdd(&usedbufs, &buf->node);
145  }
146 
147  callbackRequest(&received_cb);
148 }
149 
150 void
151 bufRxManager::received(CALLBACK* cb)
152 {
153  void *vptr;
154  callbackGetUser(vptr,cb);
155  bufRxManager &self=*static_cast<bufRxManager*>(vptr);
156 
157  SCOPED_LOCK2(self.guard, G);
158 
159  while(true) {
160  ELLNODE *node=ellGet(&self.usedbufs);
161 
162  if (!node)
163  break;
164  buffer *buf=CONTAINER(node, buffer, node);
165 
166  G.unlock();
167 
168  for(ELLNODE *cur=ellFirst(&self.dispatch); cur; cur=ellNext(cur)) {
169  listener *action=CONTAINER(cur, listener, node);
170  if(evrMrmSeqRxDebug>=3) {
171  errlogPrintf("buffer listener %p\n", action);
172  }
173  (action->fn)(action->fnarg, 0, buf->used, buf->data);
174  }
175 
176  G.lock();
177 
178  ellAdd(&self.freebufs, &buf->node);
179  };
180 }
181 
182 void
184 {
185  SCOPED_LOCK(guard);
186  onerror=fn;
187  onerror_arg=arg;
188 }
189 
190 void
192 {
193  listener *l;
194 
195  SCOPED_LOCK(guard);
196 
197  for(ELLNODE *node=ellFirst(&dispatch); node; node=ellNext(node))
198  {
199  l=CONTAINER(node, listener, node);
200  // Don't add duplicates
201  if (l->fn==fn && l->fnarg==arg) {
202  return;
203  }
204  }
205 
206  l=new listener;
207  l->fn=fn;
208  l->fnarg=arg;
209 
210  ellAdd(&dispatch, &l->node);
211 }
212 
213 void
215 {
216  listener *l;
217 
218  SCOPED_LOCK(guard);
219 
220  for(ELLNODE *node=ellFirst(&dispatch); node; node=ellNext(node))
221  {
222  l=CONTAINER(node, listener, node);
223  if (l->fn==fn && l->fnarg==arg) {
224  ellDelete(&dispatch, node);
225  delete l;
226  return;
227  }
228  }
229 }
virtual void dataRxAddReceive(dataBufComplete fptr, void *arg=0) OVERRIDE FINAL
Register to receive data buffers.
Definition: bufrxmgr.cpp:191
virtual ~bufRxManager()
Definition: bufrxmgr.cpp:82
#define CONTAINER(ptr, structure, member)
Definition: bufrxmgr.cpp:36
void receive(epicsUInt8 *, unsigned int)
Definition: bufrxmgr.cpp:118
virtual void dataRxError(dataBufComplete, void *) OVERRIDE FINAL
Notification if Rx queue overflows.
Definition: bufrxmgr.cpp:183
int evrMrmSeqRxDebug
Definition: drvem.cpp:62
virtual void dataRxDeleteReceive(dataBufComplete fptr, void *arg=0) OVERRIDE FINAL
Unregister.
Definition: bufrxmgr.cpp:214
unsigned int bsize()
Definition: bufrxmgr.h:28
void(* dataBufComplete)(void *arg, epicsStatus ok, epicsUInt32 len, const epicsUInt8 *buf)
Definition: databuf.h:31
bufRxManager(const std::string &, unsigned int qdepth, unsigned int bsize=0)
Definition: bufrxmgr.cpp:63
epicsUInt8 * getFree(unsigned int *)
Definition: bufrxmgr.cpp:98
#define CBINIT(ptr, prio, fn, valptr)
Definition: bufrxmgr.cpp:41