Job是给用户构造的和用的(Reciever用于接收数据,Sender用于发送数据),而Blob是通过Job构造的,用户不能构造Blob
用户创建一个RecieverJob,然后通过DPDK协议栈获得数据,然后通过Job调用DDaqBlob的appendData(模式1,2)或writeData(模式3)往Blob里写数据,用户再通过Blob的读函数readData或readDataNoCopy读数据
class DDaqReceiverJob
{
private:
/* data */
public:
DDaqReceiverJob(/* args */); //这里构造函数里会构造一个blob
~DDaqReceiverJob();
/// the ip address and port of the receiver
std::string ipAddress;
int port;
// the mode of this job: 1. blob mode, when the whole blob is received, then shift the blob head to the latest sample position
// 2. stream mode, when new samples are received, the blob head is shifted to the latest sample position
// 3. reflect memory mode, never shift blob head, replace sample in the blob with the new sample at specified position
// 这里的反射内存不是真正意义上的完全反射内存模式,在这种模式下只有一个Sender往这块内存写数据,可以有多个Reciever,只要Sender往内存写数据,所有Reciever的这块内存都能收到数据(Reciever只能读这块内存,不能写)
int mode;
// the id of this job, must match the id of the sender job
int jobId;
// the reference to the blob object
DDaqBlob *blob;
// 这里并不符合c++语法,只是为了表示函数会触发事件,并且用户可以注册和取消注册事件(+=和-=)
// when the blob is updated(mode 1,2 head shifted, mode 3 the blob is changed), fire this event
event void onBlobUpdated(DDaqBlob *blob)
{
// do something with the blob
}
// when the sender explicitly send event with a message, fire this event, only work in mode 3
event void onMessageReceived(std::string message)
{
// do something with the message
}
}
Blob是个存数据的容器,用户不能直接向Blob写数据(必须通过Job),用户可以通过Blob读数据
// the blob object, it is a container for the data
class DDaqBlob
{
// 这里使用友元类,表示下面三个私有函数只能被job访问
// only the job can access the blob, the blob is created by the job, and the buffer is allocated in the job
friend class DDaqReceiverJob;
friend class DDaqSenderJob;
// 下面三个是提供给Job的函数,这三个函数只能够给Job访问
// write data to the blob, only the job can do this, called when the data have been received or the data has been sent
// only call this in mode 3
// do not block reading, the data is written to the buffer,
// and the blob is updated by shifting the head of the ping-pong buffer
void writeData(int channelStart, int channelCount,
int sampleStart, int sampleCount,
char *dataBuffer);
// only call this in mode 1,2, called when the data have been received or the data has been sent
// no need to block reading, since the data is appended to ahead of the blob head
// always append with all the channels, sample count may vary
// the sample may not be received in order, so the sample index is used to specify the sample number in the blob.
// the index is a cycle, when it reaches the end of the blob, it will start from the 0 again
// for missing samples, when the the sample which is X (a config of the job) sample later than the missing sample has been received,
// the missing sample is considered as lost forever, and filled with 0 or samples of the same index from the last blob.
void appendData(int sampleCount, int sampleIndex, char *dataBuffer);
// lock reading, update the read head of the blob
// in mode 3, just flip the ping-pong buffer
// in mode 1,2, shift to the latest sample position
void updateBlobHead();
private:
/* data */
// the buffer for the data, it's larger than the actual blob size
// the actual blob size is channel * sampleSize,
// the buffer size is channel * (sampleSize + additionalSize)
// the additional size is used for:
// in mode 1: for the next incoming blob
// in mode 2: for the incoming samples, for both mode 1,2, the additional is reserved for noCopy read
// in mode 3: the additional size is sampleSize, for the reflect memory mode, ping-pong buffer
// for sender the buffer is the same as the blob size, the additional size is not used
char *buffer;
public:
// specify the channel and sample size, receiver or sender
// the blob is created by the job, and the buffer is allocated in the job
DDaqBlob(int channel, int sampleSize, int additionalSize, bool isSender = false);
~DDaqBlob();
// 下面三个函数是提供给用户的读数据的接口
// read the data from the buffer, with copy
void readData(int channelStart, int channelCount,
int sampleStart, int sampleCount,
char *dataBuffer);
// read the data from the buffer, with no copy, just a pointer to the buffer
// must call finishReadNoCopy to finish the read, and release the buffer pointer
// although the reader specifies the channel and sample start and size, the tail point only move on the sample axis not the channel axis.
char *readDataNoCopy(int channelStart, int channelCount,
int sampleStart, int sampleCount);
// finished the blob with no copy
void finishReadNoCopy(char *reaBbuffer);
}
// the sender job, it is the sender of the data
class DDaqSenderJob
{
private:
/* data */
public:
DDaqSenderJob(/* args */);
~DDaqSenderJob();
// the ip address and port of the sender
std::string ipAddress;
int port;
// for sender job, there is no different between blob and stream mode,
// it just stream samples to the receiver.
int mode;
// the id of this job, must match the id of the receiver job
int jobId;
// 考虑用户将数据发出去后可能还想自己存留一份历史记录,因此有个MirrorBlob的概念,就是用户用户发了数据后会将数据同时增加/写入到MirrorBlob中
// if true, the blob is mirrored
// if not this job does not need a blob object, just send and forget
bool isMirrorBlob;
// isMirrorBlob为true时将其构造初始化
DDaqBlob *blob;
// send samples to the receiver, only call this in mode 1,2
// data are always sent with all the channels, sample count may vary
// construct slice in this function,
// slice will have a sample index number to specify the number of sample in the blob.
// update the blob in sender after sent when the blob is mirrored
void sendData(int sampleCount, char *dataBuffer);
// call this in mode 3
// update the blob in sender after sent when the blob is mirrored
void writeData(int channelStart, int channelCount,
int sampleStart, int sampleCount,
char *dataBuffer);
// 这里并不符合c++语法,只是为了表示函数会触发事件,并且用户可以注册和取消注册事件(+=和-=)
event void onDateSend(std::string message)
{
// do something with the message
}
}
本文章使用limfx的vscode插件快速发布