//=========================================================================== // @(#) $Name: cflowd-2-1-a3 $ // @(#) $Id: CflowdPacketQueue.cc,v 1.9 1999/02/08 01:07:01 dwm Exp $ //=========================================================================== // CAIDA Copyright Notice // // By accessing this software, cflowd++, you are duly informed // of and agree to be bound by the conditions described below in this // notice: // // This software product, cflowd++, is developed by Daniel W. McRobb, and // copyrighted(C) 1998 by the University of California, San Diego // (UCSD), with all rights reserved. UCSD administers the CAIDA grant, // NCR-9711092, under which part of this code was developed. // // There is no charge for cflowd++ software. You can redistribute it // and/or modify it under the terms of the GNU General Public License, // v. 2 dated June 1991 which is incorporated by reference herein. // cflowd++ is distributed WITHOUT ANY WARRANTY, IMPLIED OR EXPRESS, OF // MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE or that the use // of it will not infringe on any third party's intellectual property // rights. // // You should have received a copy of the GNU GPL along with cflowd++. // Copies can also be obtained from: // // http://www.gnu.org/copyleft/gpl.html // // or by writing to: // // University of California, San Diego // // SDSC/CAIDA // 9500 Gilman Dr., MS-0505 // La Jolla, CA 92093 - 0505 USA // // Or contact: // // info@caida.org //=========================================================================== extern "C" { #include "aclocal.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "caida_t.h" #include "CflowdFlowPdu.h" #ifndef MAP_FAILED #define MAP_FAILED ((caddr_t)-1) #endif #ifdef SEMCTL_NEEDS_UNION #ifdef SEMCTL_NEEDS_SEMUN_DEFN union semun { int val; struct semid_ds *buf; ushort *array; }; #endif #endif } #include #include "CflowdPacketQueue.hh" static const string rcsid = "@(#) $Name: cflowd-2-1-a3 $ $Id: CflowdPacketQueue.cc,v 1.9 1999/02/08 01:07:01 dwm Exp $"; //------------------------------------------------------------------------- // CflowdPacketQueue::CflowdPacketQueue() //......................................................................... // constructor //------------------------------------------------------------------------- CflowdPacketQueue::CflowdPacketQueue() { this->_mapAddr[0] = (caddr_t)MAP_FAILED; this->_mapAddr[1] = (caddr_t)MAP_FAILED; this->_mapLength = 0; this->_writePtr = (caddr_t)0; this->_currentBuffer = 0; this->_shmId = 0; } //------------------------------------------------------------------------- // CflowdPacketQueue::~CflowdPacketQueue() //......................................................................... // destructor //------------------------------------------------------------------------- CflowdPacketQueue::~CflowdPacketQueue() { if (this->_mapAddr[0] != (caddr_t)-1) { // detach shared memory segment. if (shmdt(this->_mapAddr[0]) < 0) { syslog(LOG_ERR,"[E] shmdt(%#x) failed: %m {%s:%d}", this->_mapAddr[0],__FILE__,__LINE__); } } } //------------------------------------------------------------------------- // int CflowdPacketQueue::GetSemaphore(const char *name) //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::GetSemaphore(const char *name) { int mapsemid = semget(ftok((char *)name,0),2,S_IRWXU|S_IRWXG|S_IRWXO); if (mapsemid < 0) { syslog(LOG_ERR, "[E] semget(ftok(\"%s\",0),2,S_IRWXU|S_IRWXG|S_IRWXO)" " failed to get semaphore set: %m {%s:%d}",name,__FILE__,__LINE__); return(-1); } this->_semId[0] = mapsemid; mapsemid = semget(ftok((char *)name,1),2,S_IRWXU|S_IRWXG|S_IRWXO); if (mapsemid < 0) { syslog(LOG_ERR, "[E] semget(ftok(\"%s\",0),2,S_IRWXU|S_IRWXG|S_IRWXO)" " failed to get semaphore set: %m {%s:%d}",name,__FILE__,__LINE__); return(-1); } this->_semId[1] = mapsemid; syslog(LOG_INFO,"[I] got semaphore: id %d %d", this->_semId[0], this->_semId[1]); return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::CreateSemaphore(const char *name) //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::CreateSemaphore(const char *name) { int mapsemid; errno = 0; mapsemid = semget(ftok((char *)name,0),2,IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO); if (mapsemid < 0) { syslog(LOG_ERR, "[E] semget(ftok(\"%s\",0),2,IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO)" " failed: %m {%s:%d}",name,__FILE__,__LINE__); if (errno == EACCES) { // looks like semaphore already exists. Try using it. mapsemid = semget(ftok((char *)name,0),2,S_IRWXU|S_IRWXG|S_IRWXO); if (mapsemid < 0) { // Hmm, couldn't use it. Try recreating it. mapsemid = semget(ftok((char *)name,0),2,0); if (mapsemid < 0) { // Yikes, cound't even retrieve it. syslog(LOG_CRIT, "[C] semget(ftok(\"%s\",0),2,0) failed: %m {%s:%d}", name,__FILE__,__LINE__); } else { // Try to remove it. #ifdef SEMCTL_NEEDS_UNION { union semun semArg; semArg.val = 1; if (semctl(mapsemid,0,IPC_RMID,semArg) < 0) { syslog(LOG_CRIT, "[C] semctl(%d,0,IPC_RMID,semArg) " "(remove semaphore) failed: %m {%s:%d}", mapsemid,__FILE__,__LINE__); } } #else if (semctl(mapsemid,0,IPC_RMID,NULL) < 0) { syslog(LOG_CRIT, "[C] semctl(%d,0,IPC_RMID,NULL) " "(remove semaphore) failed: %m {%s:%d}", mapsemid,__FILE__,__LINE__); } #endif // And try recreating it. mapsemid = semget(ftok((char *)name,0),2, IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO); } } } } // Create 2 semaphores for each buffer instead of just one // so they interlock between applications - STN 9/12/00 this->_semId[0] = mapsemid; mapsemid = semget(ftok((char *)name,1),2,IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO); if (mapsemid < 0) { syslog(LOG_ERR, "[E] semget(ftok(\"%s\",0),2,IPC_CREAT|S_IRWXU|S_IRWXG|S_IRWXO)" " failed: %m {%s:%d}",name,__FILE__,__LINE__); } this->_semId[1] = mapsemid; return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::Create(const char *name, int mapSize) //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::Create(const char *name, int mapSize) { int shmId; assert(name != (char *)0); // always make map length a multiple of 4K this->_mapLength = ((mapSize / 4096) + 1) * 4096; // create shared memory packet queue. shmId = shmget(ftok((char *)name,0),this->_mapLength, IPC_CREAT|S_IRWXU|S_IRGRP|S_IROTH); if (shmId < 0) { // try deleting segment and creating it anew. shmId = shmget(ftok((char *)name,0),4,S_IRWXU|S_IRGRP|S_IROTH); if (shmId >= 0) { syslog(LOG_INFO, "[I] packet queue shmem segment already exists {%s:%d}", __FILE__,__LINE__); // try to delete old segment. if (shmctl(shmId,IPC_RMID,0) < 0) { syslog(LOG_ERR, "[E] shmctl(%d,IPC_RMID,0) failed: %m {%s:%d}", shmId,__FILE__,__LINE__); } else { syslog(LOG_INFO, "[I] removed old packet queue shmem segment {%s:%d}", __FILE__,__LINE__); shmId = shmget(ftok((char *)name,0),this->_mapLength, IPC_CREAT|S_IRWXU|S_IRGRP|S_IROTH); } } if (shmId < 0) { syslog(LOG_ERR, "[E] shmget(ftok(\"%s\",0),%d,IPC_CREAT|S_IRWXU|S_IRGRP|S_IROTH)" " failed: %m {%s:%d}",name,this->_mapLength,__FILE__,__LINE__); return(-1); } } syslog(LOG_INFO,"[I] created %u byte packet queue shmem segment {%s:%d}", this->_mapLength,__FILE__,__LINE__); // attach to shared memory packet queue. if ((this->_mapAddr[0] = (caddr_t)shmat(shmId,0,0)) == (caddr_t)-1) { syslog(LOG_ERR, "[E] shmat(%d,0,0) failed: %m {%s:%d}",shmId,__FILE__,__LINE__); if (shmctl(shmId,IPC_RMID,0) < 0) { syslog(LOG_CRIT,"[C] shmctl(%d,IPC_RMID,0) failed: %m {%s:%d}", shmId,__FILE__,__LINE__); } return(-1); } syslog(LOG_INFO,"[I] attached to %d byte packet queue at %#x", this->_mapLength,this->_mapAddr[0]); // create packet queue control semaphore. if (this->CreateSemaphore(name) < 0) { syslog(LOG_CRIT,"[C] Failed to create semaphore! {%s:%d}", __FILE__,__LINE__); if (shmdt(this->_mapAddr[0]) < 0) { syslog(LOG_CRIT,"[C] shmdt(%#x) failed: %m {%s:%d}", this->_mapAddr[0],__FILE__,__LINE__); } if (shmctl(shmId,IPC_RMID,0) < 0) { syslog(LOG_CRIT,"[C] shmctl(%d,IPC_RMID,0) failed: %m {%s:%d}", shmId,__FILE__,__LINE__); } this->_mapAddr[0] = (caddr_t)-1; return(-1); } this->_shmId = shmId; this->_mapAddr[1] = this->_mapAddr[0] + (this->_mapLength/2); this->_currentBuffer = 0; this->_writePtr = this->_mapAddr[this->_currentBuffer] + sizeof(int); // Initialize all semaphore values - STN 9/12/00 semctl(this->_semId[0], 0, SETVAL, 1); semctl(this->_semId[0], 1, SETVAL, 1); semctl(this->_semId[1], 0, SETVAL, 1); semctl(this->_semId[1], 1, SETVAL, 1); return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::Destroy() //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::Destroy() { if (this->_mapAddr[0] != (caddr_t)-1) { // detach shared memory segment. if (shmdt(this->_mapAddr[0]) < 0) { syslog(LOG_ERR,"[E] shmdt(%#x) failed: %m {%s:%d}", this->_mapAddr[0],__FILE__,__LINE__); return(-1); } this->_mapAddr[0] = (caddr_t)-1; this->_mapAddr[1] = (caddr_t)-1; this->_currentBuffer = 0; this->_writePtr = (caddr_t)0; this->_mapLength = 0; // remove shared memory segment. if (shmctl(this->_shmId,IPC_RMID,0) < 0) { syslog(LOG_CRIT,"[C] shmctl(%d,IPC_RMID,0) failed: %m {%s:%d}", this->_shmId,__FILE__,__LINE__); return(-1); } } this->_shmId = 0; return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::Open(const char *name, int mapSize) //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::Open(const char *name, int mapSize) { int shmId; if (this->GetSemaphore(name) < 0) { syslog(LOG_CRIT,"[C] failed to get semaphore! {%s:%d}", __FILE__,__LINE__); return(-1); } // always make map length a multiple of 4K this->_mapLength = ((mapSize / 4096) + 1) * 4096; shmId = shmget(ftok((char *)name,0),this->_mapLength, SHM_R|(SHM_R>>3)|(SHM_R>>6)); if (shmId < 0) { syslog(LOG_ERR, "[E] shmget(ftok(\"%s\",0),%d,SHM_R|(SHM_R>>3)|(SHM_R>>6))" " failed: %m {%s:%d}",name,this->_mapLength,__FILE__,__LINE__); return(-1); } if ((this->_mapAddr[0] = (caddr_t)shmat(shmId,0,0)) == (caddr_t)-1) { syslog(LOG_ERR, "[E] shmat(%d,0,0) failed: %m {%s:%d}",shmId,__FILE__,__LINE__); return(-1); } syslog(LOG_INFO,"[I] attached to %d byte packet queue at %#x", this->_mapLength,this->_mapAddr[0]); this->_shmId = shmId; this->_mapAddr[1] = this->_mapAddr[0] + (this->_mapLength/2); // start out using buffer 0 this->_currentBuffer = 0; this->_readPtr = this->_mapAddr[this->_currentBuffer] + sizeof(int); // We don't do an explicit lock here, because readers might idle // around after opening and they'll be doing mass reads with a // lock/unlock around the mass read. // release 2nd buffer so cflowdmux can write to it while cflowd reads 1st buffer - STN 9/12/00 this->_currentBuffer = 1; this->ReleaseLock(true); this->_currentBuffer = 0; // this->GetLock(); return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::Close() //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::Close() { if (this->_mapAddr[0] != (caddr_t)-1) { if (shmdt(this->_mapAddr[0]) < 0) { syslog(LOG_ERR,"[E] shmdt(%#x) failed: %m {%s:%d}", this->_mapAddr[0],__FILE__,__LINE__); return(-1); } else { this->_mapAddr[0] = (caddr_t)-1; this->_mapAddr[1] = (caddr_t)-1; this->_writePtr = (caddr_t)-1; this->_readPtr = (caddr_t)-1; this->_mapLength = 0; this->_shmId = 0; this->_semId[0] = 0; this->_semId[1] = 0; } } return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::GetLock() //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::GetLock(bool appnum) { int sem_val; // determine which semaphore to get by application - STN 9/12/00 int app; if (appnum) app = 0; else app = 1; // Get a lock for our current buffer. struct sembuf opLockCurrentBuffer[2] = { {this->_currentBuffer,0,0}, // wait (removed SEM_UNDO) {this->_currentBuffer,1,0} }; // lock (removed SEM_UNDO) if (semop(this->_semId[app],&opLockCurrentBuffer[0],2) < 0) { syslog(LOG_ERR,"[E] semop(%d,%#x,2) failed to get semaphore: %m %ld {%s:%d}", this->_semId[app],&opLockCurrentBuffer[0],errno,__FILE__,__LINE__); syslog(LOG_ERR,"[E] current buffer is %d", this->_currentBuffer); sem_val = semctl(this->_semId[app], this->_currentBuffer, GETVAL, 0); syslog(LOG_ERR,"[E] current value is %d", sem_val); return(-1); } sem_val = semctl(this->_semId[app], this->_currentBuffer, GETVAL, 0); if (sem_val != 1) { syslog(LOG_ERR,"[E] (GetLock) taken semaphore value is not 1: %d", sem_val); } //FILE *file = fopen("/tmp/cflow.txt","a+"); //int sem_val = semctl(this->_semId[app], this->_currentBuffer, GETVAL, 0); //fprintf(file,"Got sem %d val: %ld by app %d\n", this->_currentBuffer, sem_val, (int)appnum); //fclose(file); return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::ReleaseLock() //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::ReleaseLock(bool appnum) { // determine which semaphore to release by application - STN 9/12/00 int app; if (appnum) app = 1; else app = 0; // Release the lock on our current buffer. struct sembuf opUnlockCurrentBuffer[1] = { {this->_currentBuffer, -1, IPC_NOWAIT} }; //(removed SEM_UNDO) int sem_val = semctl(this->_semId[app], this->_currentBuffer, GETVAL, 0); if (sem_val != 1) { syslog(LOG_ERR,"[E] (ReleaseLock) taken semaphore value is not 1: %d", sem_val); } if (semop(this->_semId[app],&opUnlockCurrentBuffer[0],1) < 0) { syslog(LOG_ERR, "[E] semop(%d,%#x,1) failed to release buffer lock: %m %ld {%s:%d}", this->_semId[app],&opUnlockCurrentBuffer[0],errno,__FILE__,__LINE__); syslog(LOG_ERR,"[E] current buffer is %d", this->_currentBuffer); syslog(LOG_ERR,"[E] value before decrement: %d", sem_val); sem_val = semctl(this->_semId[app], this->_currentBuffer, GETVAL, 0); syslog(LOG_ERR,"[E] value after decrement (another get may have occured): %d", sem_val); return(-1); } //FILE *file = fopen("/tmp/cflow.txt","a+"); //int sem_val = semctl(this->_semId[app], this->_currentBuffer, GETVAL, 0); //fprintf(file,"Released sem %d val: %ld by app %d\n", this->_currentBuffer, sem_val, (int)appnum); //fclose(file); return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::Enqueue(ipv4addr_t ciscoIpAddr, // const unsigned char *pkt, int len) //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::Enqueue(ipv4addr_t ciscoIpAddr, const unsigned char *pkt, int len) { // If we're at the end of the current buffer, switch to the other buffer. if ((this->_writePtr + k_maxFlowPacketSize) > (this->_mapAddr[this->_currentBuffer] + this->_mapLength/2)) { if (this->ToggleBuffers() < 0) { syslog(LOG_CRIT,"[C] failed to toggle buffers! {%s:%d}", __FILE__,__LINE__); return(-1); } this->NumPackets(0); } // copy in the Cisco source IP address. memcpy(this->_writePtr,&ciscoIpAddr,sizeof(ciscoIpAddr)); this->_writePtr += sizeof(ciscoIpAddr); // copy in the packet. memcpy(this->_writePtr,pkt,len); this->_writePtr += (k_maxFlowPacketSize - sizeof(ciscoIpAddr)); this->NumPackets(this->NumPackets() + 1); return(0); } //------------------------------------------------------------------------- // const char *CflowdPacketQueue::GetPacket(ipv4addr_t & ciscoIpAddr) //......................................................................... // //------------------------------------------------------------------------- const char *CflowdPacketQueue::GetPacket(ipv4addr_t & ciscoIpAddr) { ipv4addr_t ipAddr; // get the source address of the packet memcpy(&ipAddr,this->_readPtr,sizeof(ipAddr)); ciscoIpAddr = ipAddr; // and then the payload char *pktPtr = this->_readPtr + sizeof(ipAddr); this->_readPtr += k_maxFlowPacketSize; return(pktPtr); } //------------------------------------------------------------------------- // int CflowdPacketQueue::ToggleBuffers(bool alreadyReleased) //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::ToggleBuffers(bool alreadyReleased) { int otherBuffer; if (this->_currentBuffer) otherBuffer = 0; else otherBuffer = 1; if (!alreadyReleased) { // specify which app is doing ReleaseLock - STN 9/12/00 if (this->ReleaseLock(alreadyReleased) < 0) { syslog(LOG_ERR,"[E] failed to release lock in ToggleBuffers()!"); return(-1); } } this->_currentBuffer = otherBuffer; // specify which app is doing GetLock - STN 9/12/00 if (this->GetLock(alreadyReleased) < 0) { syslog(LOG_ERR,"[E] failed to get lock in ToggleBuffers()!"); return(-1); } this->_writePtr = this->_mapAddr[this->_currentBuffer] + sizeof(int); this->_readPtr = this->_mapAddr[this->_currentBuffer] + sizeof(int); time(&(this->_lastToggle)); otherBuffer = (otherBuffer) ? 0 : 1; return(0); } //------------------------------------------------------------------------- // int CflowdPacketQueue::NumPackets() //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::NumPackets() { int numPackets; memcpy(&numPackets,this->_mapAddr[this->_currentBuffer],sizeof(numPackets)); return(numPackets); } //------------------------------------------------------------------------- // int CflowdPacketQueue::NumPackets(int numPackets) //......................................................................... // //------------------------------------------------------------------------- int CflowdPacketQueue::NumPackets(int numPackets) { memcpy(this->_mapAddr[this->_currentBuffer],&numPackets,sizeof(numPackets)); return(numPackets); }