/* * THIS FILE IS UNDER RCS - DO NOT MODIFY UNLESS YOU HAVE * CHECKED IT OUT USING THE COMMAND CHECKOUT. * * $Id: fir.c,v 1.6 2008/02/12 18:47:36 withers Exp $ * * Revision history: * $Log: fir.c,v $ * Revision 1.6 2008/02/12 18:47:36 withers * moved nsamp test to after byte swap * * Revision 1.5 2007/02/26 19:52:50 paulf * fixed a ton of windows warnings related to time_t again * * Revision 1.4 2007/02/14 19:57:19 luetgert * Fixed problem when encountering a zero-length trcebuf. * . * * Revision 1.3 2004/07/28 22:43:05 lombard * Modified to handle SCNLs and TYPE_TRACEBUF2 (only!) messages. * * Revision 1.2 2001/05/09 22:31:16 dietz * Changed to shut down gracefully if the transport flag is * set to TERMINATE or Fir.MyPid. * * Revision 1.1 2000/02/14 17:27:23 lucky * Initial revision * * */ /* * fir.c: * */ #include #include #include #include #include #include #include #include #include "fir.h" main (int argc, char **argv) { WORLD Fir; /* Our main data structure */ time_t timeNow; /* current time */ time_t timeLastBeat; /* time last heartbeat was sent */ long sizeMsg; /* size of retrieved message */ SHM_INFO regionIn; /* Input shared memory region info. */ MSG_LOGO logoWave; /* Logo of requested waveforms. */ MSG_LOGO logoMsg; /* logo of retrieved message */ unsigned tidFir; /* Filter thread id */ char *inBuf; /* Pointer to the input message buffer. */ int inBufLen; /* Size of input message buffer */ TRACE2_HEADER *WaveHeader; int ret; char msgText[MAXMESSAGELEN]; /* string for log/error messages */ /* Check command line arguments ******************************/ if (argc != 2) { fprintf (stderr, "Usage: fir \n"); exit (EW_FAILURE); } /* Read config file and configure the decimator */ if (Configure(&Fir, argv) != EW_SUCCESS) { fprintf (stderr, "fir: configure() failed \n"); exit (EW_FAILURE); } if ( Fir.firParam.testMode ) { logit("e", "Fir terminating normally for test mode; see log for results\n"); exit (EW_SUCCESS); } /* We will put the station index ine front of the trace message, so we * * don't have to look up the SCNL again at the other end of the queue. */ inBufLen = MAX_TRACEBUF_SIZ + sizeof( int ); if ( ! ( inBuf = (char *) malloc( (size_t) inBufLen ) ) ) { logit( "e", "%s: Memory allocation failed - initial message buffer!\n", argv[0] ); exit( EW_FAILURE ); } WaveHeader = (TRACE2_HEADER *) (inBuf + sizeof(int)); /* Attach to Input shared memory ring *******************************************/ tport_attach (®ionIn, Fir.firEWH.ringInKey); if (Fir.firParam.debug) logit ("", "Attached to public memory region %s: %d\n", Fir.firParam.ringIn, Fir.firEWH.ringInKey); /* Attach to Output shared memory ring *******************************************/ if (Fir.firEWH.ringOutKey == Fir.firEWH.ringInKey) Fir.regionOut = regionIn; else tport_attach (&(Fir.regionOut), Fir.firEWH.ringOutKey); if (Fir.firParam.debug) logit ("", "Attached to public memory region %s: %d\n", Fir.firParam.ringOut, Fir.firEWH.ringOutKey); /* Specify logos of incoming waveforms */ logoWave.instid = Fir.firEWH.readInstId; logoWave.mod = Fir.firEWH.readModId; logoWave.type = Fir.firEWH.typeWaveform; /* Specify logos of outgoing messages */ Fir.hrtLogo.instid = Fir.firEWH.myInstId; Fir.hrtLogo.mod = Fir.firEWH.myModId; Fir.hrtLogo.type = Fir.firEWH.typeHeartbeat; Fir.errLogo.instid = Fir.firEWH.myInstId; Fir.errLogo.mod = Fir.firEWH.myModId; Fir.errLogo.type = Fir.firEWH.typeError; Fir.trcLogo.instid = Fir.firEWH.myInstId; Fir.trcLogo.mod = Fir.firEWH.myModId; Fir.trcLogo.type = Fir.firEWH.typeWaveform; /* Force a heartbeat to be issued in first pass thru main loop */ timeLastBeat = time (&timeNow) - Fir.firParam.heartbeatInt - 1; /* Flush the incoming transport ring */ while (tport_getmsg (®ionIn, &logoWave, 1, &logoMsg, &sizeMsg, inBuf, inBufLen) != GET_NONE); /* Create MsgQueue mutex */ CreateMutex_ew(); /* Allocate the message Queue */ initqueue (&(Fir.MsgQueue), QUEUE_SIZE, inBufLen); /* Start decimator thread which will read messages from * * the Queue, fir them and write them to the OutRing */ if (StartThreadWithArg (FirThread, (void *) &Fir, (unsigned) THREAD_STACK, &tidFir) == -1) { logit( "e", "fir: Error starting Fir thread. Exitting.\n"); tport_detach (®ionIn); tport_detach (&(Fir.regionOut)); exit (EW_FAILURE); } Fir.FirStatus = 0; /*assume the best*/ /*--------------------- setup done; start main loop -------------------------*/ while (tport_getflag (®ionIn) != TERMINATE && tport_getflag (®ionIn) != Fir.MyPid ) { /* send fir' heartbeat */ if (time (&timeNow) - timeLastBeat >= Fir.firParam.heartbeatInt) { timeLastBeat = timeNow; StatusReport (&Fir, Fir.firEWH.typeHeartbeat, 0, ""); } if (Fir.FirStatus < 0) { logit ("et", "fir: Filter thread died. Exitting\n"); exit (EW_FAILURE); } ret = tport_getmsg (®ionIn, &logoWave, 1, &logoMsg, &sizeMsg, inBuf + sizeof(int), inBufLen - sizeof(int)); /* Check return code; report errors */ if (ret != GET_OK) { if (ret == GET_TOOBIG) { sprintf (msgText, "msg[%ld] i%d m%d t%d too long for target", sizeMsg, (int) logoMsg.instid, (int) logoMsg.mod, (int)logoMsg.type); StatusReport (&Fir, Fir.firEWH.typeError, ERR_TOOBIG, msgText); continue; } else if (ret == GET_MISS) { sprintf (msgText, "missed msg(s) i%d m%d t%d in %s", (int) logoMsg.instid, (int) logoMsg.mod, (int)logoMsg.type, Fir.firParam.ringIn); StatusReport (&Fir, Fir.firEWH.typeError, ERR_MISSMSG, msgText); } else if (ret == GET_NOTRACK) { sprintf (msgText, "no tracking for logo i%d m%d t%d in %s", (int) logoMsg.instid, (int) logoMsg.mod, (int)logoMsg.type, Fir.firParam.ringIn); StatusReport (&Fir, Fir.firEWH.typeError, ERR_NOTRACK, msgText); } else if (ret == GET_NONE) { sleep_ew(500); continue; } } /* Check to see if msg's SCNL code is desired. Note that we don't need * * to do byte-swapping before we can read the SCNL. */ if ((ret = matchSCNL(WaveHeader, &Fir )) < -1 ) { logit ("et", "Fir: Call to matchSCNL failed; exitting.\n"); exit (EW_FAILURE); } else if ( ret == -1 ) /* Not an SCNL we want */ continue; /* stick the SCNL number as an int at the front of the message */ *((int*)inBuf) = ret; /* If necessary, swap bytes in the wave message */ if (WaveMsg2MakeLocal (WaveHeader) < 0) { logit ("et", "Fir: Unknown waveform type.\n"); continue; } if(WaveHeader->nsamp <= 0) { logit ("et", "Fir: Zero length tracebuf. %s_%s_%s_%s\n", WaveHeader->sta, WaveHeader->chan, WaveHeader->net, WaveHeader->loc); continue; } /* Queue retrieved msg */ RequestMutex (); ret = enqueue (&(Fir.MsgQueue), inBuf, sizeMsg + sizeof(int), logoMsg); ReleaseMutex_ew (); if (ret != 0) { if (ret == -1) { sprintf (msgText, "Message too large for queue; Lost message."); StatusReport (&Fir, Fir.firEWH.typeError, ERR_QUEUE, msgText); continue; } if (ret == -3) { sprintf (msgText, "Queue full. Old messages lost."); StatusReport (&Fir, Fir.firEWH.typeError, ERR_QUEUE, msgText); continue; } } /* problem from enqueue */ } /* wait until TERMINATE is raised */ /* Termination has been requested */ tport_detach (®ionIn); tport_detach (&(Fir.regionOut)); logit ("t", "Termination requested; exitting!\n" ); exit (EW_SUCCESS); }