/**************************************************************************/ /*! @file DidacticNet.cpp @author anian buehler @ letsgoING.org */ /**************************************************************************/ #include "Arduino.h" #include "DidacticNet.h" //************************************************************************** //ROOT //************************************************************************** DidacticPSNet::DidacticPSNet(){} DidacticPSNet::~DidacticPSNet(){} void DidacticPSNet::begin(Stream& _port){ setStream(_port); } void DidacticPSNet::begin(Stream& _port, PSNET_CALLBACK_SIGNATURE){ setStream(_port); setCallback(callback); } DidacticPSNet& DidacticPSNet::setCallback(PSNET_CALLBACK_SIGNATURE){ this->callback = callback; return *this; } void DidacticPSNet::setStream(Stream& stream){ _port = &stream; } bool DidacticPSNet::handleNetwork(){ //if(_waitingTime <= millis()){ if(checkData()){ if(recieveData()){ //Serial.print("Message filter: ");Serial.println(_readBufferMessage[1]); //Serial.print("Check Message filter: ");Serial.println(getMessageFilter(_readBufferMessage[1])); if(getMessageFilter(_readBufferMessage[1])){ handleData(); } } _waitingTimeCSMA = millis()+ random(CSMA_MIN_DELAY_MS, CSMA_MAX_DELAY_MS); } //else if(_dataToSend){ if(_dataToSend && _waitingTimeSend <= millis() && _waitingTimeCSMA <= millis()){ //send data to network //TODO: test added CSMA_CHECKDELAY + 2nd checkData() unsigned long delayStartTime = micros(); while(micros() < delayStartTime + CSMA_CHECK_DELAY_US); //delayMicroseconds(CSMA_CHECK_DELAY_US); //removed: blocking SoftSerial-interrupts if(!checkData()){ sendData(); _dataToSend = false; //_waitingTime = millis()+ random(CSMA_MID_DELAY_MS, CSMA_MAX_DELAY_MS); _waitingTimeSend = millis() + _intervalTime;//random(CSMA_MID_DELAY_MS, CSMA_MAX_DELAY_MS); return true; /* if(!sendData()){ return false; } else{ _dataToSend = false; //_waitingTime = millis()+ random(CSMA_MID_DELAY_MS, CSMA_MAX_DELAY_MS); _waitingTimeSend = millis() + _intervalTime;//random(CSMA_MID_DELAY_MS, CSMA_MAX_DELAY_MS); return true; }*/ } } //} return false; } bool DidacticPSNet::isDataToSend(){ return _dataToSend; } void DidacticPSNet::sendData(){ int counter = 0; bool messageSent = false; while(!messageSent){ if(counter >= MAX_LEN_TOPICS + MAX_LEN_PAYLOAD + LEN_OVERHEAD - 1){ //TODO: check!!! _sendBufferMessage[counter] = MSG_DELIMITER; //cut message and stop sending messageSent = true; } else if(_sendBufferMessage[counter] == MSG_DELIMITER){ messageSent = true; } _port->write(_sendBufferMessage[counter]); counter++; } return; } int DidacticPSNet::extractData(int startCounter, int maxLength, char* buffer, char limiter){ int counter = startCounter; while(_readBufferMessage[counter]!= limiter){ buffer[counter-startCounter] = _readBufferMessage[counter]; counter++; if((counter-startCounter) > maxLength){ counter--; break; //if > maxLenght -> leave while and return } } buffer[counter-startCounter] = '\0'; return counter-startCounter; //length } int DidacticPSNet::checkData(){ return (int)_port->available(); } bool DidacticPSNet::recieveData() { static int msgCounter = 0; static int topicCounter = 0; static int payloadCounter = 0; //if(msgCounter == NULL){ msgCounter = 0; } //if(topicCounter == NULL){ topicCounter = 0; } //if(dataCounter == NULL){ dataCounter = 0; } while (checkData()) { char localBuffer = _port->read(); if (localBuffer == MSG_PRELIMITER) { msgCounter = 0; topicCounter = 0; payloadCounter = 0; _readBufferMessage[msgCounter] = localBuffer; } else if (localBuffer == MSG_DELIMITER && _readBufferMessage[0] == MSG_PRELIMITER) { msgCounter++; _readBufferMessage[msgCounter] = localBuffer; _readBufferMessage[msgCounter+1] = '0'; msgCounter = 0; return true; } else if (localBuffer == MSG_SEPARATOR && _readBufferMessage[0] == MSG_PRELIMITER) { topicCounter = msgCounter -2; msgCounter++; _readBufferMessage[msgCounter] = localBuffer; } else if (_readBufferMessage[0] == MSG_PRELIMITER && localBuffer != MSG_DELIMITER) { msgCounter++; _readBufferMessage[msgCounter] = localBuffer; } } return false; } void DidacticPSNet::setInterval(long intervalTime){ _intervalTime = intervalTime; } //************************************************************************** // CLIENT //************************************************************************** DidacticPSNetClient::DidacticPSNetClient(){ _intervalTime = INTERVAL_CLIENT; } DidacticPSNetClient::~DidacticPSNetClient(){} int DidacticPSNetClient::publish(char* topic, char* payload){ return publish(topic, strlen(topic), payload, strlen(payload)); } int DidacticPSNetClient::publish(char* topic, int topicLength, char* payload , int payloadLength){ int error = DN_PUBLISH_SUCCESSULL; _sendBufferMessage[0] = MSG_PRELIMITER; _sendBufferMessage[1] = MSG_PUBLISH; _sendBufferMessage[2+topicLength] = MSG_SEPARATOR; _sendBufferMessage[2+topicLength+1+payloadLength] = MSG_DELIMITER; _sendBufferMessage[2+topicLength+1+payloadLength+1] = '\0'; //TODO: check if(topicLength > MAX_LEN_TOPICS){ topicLength = MAX_LEN_TOPICS; error += DN_ERROR_TOPIC_LEN; } if(payloadLength > MAX_LEN_PAYLOAD){ payloadLength = MAX_LEN_PAYLOAD; error += DN_ERROR_PAYLOAD_LEN; } for(int i = 0; i < topicLength; i++){ _sendBufferMessage[2+i] = topic[i]; } for(int i = 0; i < payloadLength; i++){ _sendBufferMessage[2+topicLength+1+i] = payload[i]; } _dataToSend = true; return error; } //TODO: TEST TEST TEST int DidacticPSNetClient::publish(char* topic, int data){ char sendPayload[MAX_LEN_PAYLOAD]; itoa(data, sendPayload, 10); return publish(topic, sendPayload); } int DidacticPSNetClient::publish(char* topic, bool data){ char sendPayload[2]; itoa(data, sendPayload, 10); return publish(topic, sendPayload); } int DidacticPSNetClient::publishOnChange(char* topic, bool input){ if(!_dataToSend){ if(eDetector.edgeDetected(input)){ return publish(topic, input); } } return DN_ERROR_NO_ERROR; } int DidacticPSNetClient::publishOnChange(char* topic, int input, int threshold){ if(!_dataToSend){ if(cDetector.valueChanged(input, threshold)){ return publish(topic, input); } } return DN_ERROR_NO_ERROR; } int DidacticPSNetClient::subscribe(char* topic){ return subscribe(topic, strlen(topic)); } int DidacticPSNetClient::subscribe(char* topic, int topicLength){ int error = DN_ERROR_NO_ERROR; if( topicLength > MAX_LEN_TOPICS){ topicLength = MAX_LEN_TOPICS; error = DN_ERROR_TOPIC_LEN; } _sendBufferMessage[0] = MSG_PRELIMITER; _sendBufferMessage[1] = MSG_SUBSCRIBE; _sendBufferMessage[2+topicLength] = MSG_DELIMITER; int topicNumber = getTopicOrWildcardNr(topic); if(topicNumber < 0){ topicNumber = getFreeTopicNr(); if(topicNumber < 0){ topicNumber = 0; } for(int i = 0; i < topicLength; i++){ _topic[topicNumber][i] = topic[i]; _sendBufferMessage[2+i]= topic[i]; } _topic[topicNumber][topicLength] = '\0'; _sendBufferMessage[2+topicLength+1]= '\0'; _dataToSend = true; } else{ for(int i = 0; i < topicLength; i++){ _sendBufferMessage[2+i]= topic[i]; } _sendBufferMessage[2+topicLength+1]= '\0'; _dataToSend = true; } while(_dataToSend){ handleNetwork(); } return error; } bool DidacticPSNetClient::unsubscribe(char* topic){ return unsubscribe(topic, strlen(topic)); } bool DidacticPSNetClient::unsubscribe(char* topic, int topicLength){ int topicNumber = getTopicNr(topic); if(topicNumber >= 0){ _topic[topicNumber][0]='\0'; return true; } return false; } bool DidacticPSNetClient::getMessageFilter(char messageType){ return messageType == MSG_UPDATE; } bool DidacticPSNetClient::savePayload(char* buffer, int position){ strcpy(_payload[position], buffer); return true; } bool DidacticPSNetClient::handleData(){ int currentTopicNr = 0; int topicLength = 0; int payloadLength = 0; topicLength = extractData(2, MAX_LEN_TOPICS, _bufferTopic, MSG_SEPARATOR); if(topicLength > 0){ currentTopicNr = getTopicOrWildcardNr(_bufferTopic); payloadLength = extractData(topicLength+3, MAX_LEN_PAYLOAD, _bufferPayload, MSG_DELIMITER); if( currentTopicNr >= 0){ savePayload( _bufferPayload, currentTopicNr); //callback(_topic[currentTopicNr], topicLength, _payload[currentTopicNr], payloadLength); #ifdef CALLBACK_W_LENGTH callback(_bufferTopic, topicLength, _bufferPayload, payloadLength); #else callback(_bufferTopic, _bufferPayload); #endif } } return true; } int DidacticPSNetClient::getTopicOrWildcardNr(char* topic){ for (int i = 0; i < MAX_NR_TOPICS_CLIENT; i++) { if (strcmp(_topic[i], topic) == 0 || _topic[i][0] == MSG_TOPIC_MULTI) { //TODO: check ... or equal MSG_TOPIC_MULTI return i; } } return -1; } int DidacticPSNetClient::getTopicNr(char* topic){ for (int i = 0; i < MAX_NR_TOPICS_CLIENT; i++) { if (strcmp(_topic[i], topic) == 0 ) { return i; } } return -1; } int DidacticPSNetClient::getFreeTopicNr() { for (int i = 0; i < MAX_NR_TOPICS_CLIENT; i++) { if (strcmp(_topic[i], "") == 0) { return i; } } return -1; } int DidacticPSNetClient::getSubscribedTopic(char* topic, int number){ if(number > 0 && number < getMaxNrTopics()){ strcpy(topic, _topic[number]); return DN_ERROR_NO_ERROR; } return DN_ERROR_NO_TOPIC; } int DidacticPSNetClient::getMaxNrTopics(){ return getFreeTopicNr(); } //************************************************************************** //Broker //************************************************************************** DidacticPSNetBroker::DidacticPSNetBroker(){ _intervalTime = INTERVAL_BROKER; } DidacticPSNetBroker::~DidacticPSNetBroker(){} bool DidacticPSNetBroker::getMessageFilter(char messageType){ return (messageType == MSG_PUBLISH || messageType == MSG_SUBSCRIBE); } bool DidacticPSNetBroker::savePayload(char* buffer, int position){ strcpy(_data[position], buffer); return true; } void DidacticPSNetBroker::writeDataToTopic(int topicNumber, char* usedTopic, char* newData) { if(strcmp(_topic[topicNumber], "") == 0){ strcpy(_topic[topicNumber], usedTopic); } strcpy(_data[topicNumber], newData); } bool DidacticPSNetBroker::handleData(){ int currentTopicNr = 0; int topicLength = 0; int dataLength = 0; if(_readBufferMessage[1] == MSG_PUBLISH){ topicLength = extractData(2, MAX_LEN_TOPICS, _bufferTopic, MSG_SEPARATOR); if(topicLength > 0){ currentTopicNr = getTopicOrWildcardNr(_bufferTopic); dataLength = extractData(topicLength+3, MAX_LEN_PAYLOAD, _bufferPayload, MSG_DELIMITER); if( currentTopicNr >= 0){ writeDataToTopic(currentTopicNr, _bufferTopic, _bufferPayload); update(_topic[currentTopicNr], topicLength, _data[currentTopicNr], dataLength); } } } else if(_readBufferMessage[1] == MSG_SUBSCRIBE){ topicLength = extractData(2, MAX_LEN_TOPICS, _bufferTopic, MSG_DELIMITER); if(topicLength > 0){ currentTopicNr = getTopicOrWildcardNr(_bufferTopic); if(currentTopicNr >= 0){ update(_topic[currentTopicNr], strlen(_topic[currentTopicNr]), _data[currentTopicNr], strlen(_data[currentTopicNr])); } } } return true; } bool DidacticPSNetBroker::update(char* topic, int topicLength, char* data , int dataLength){ _sendBufferMessage[0] = MSG_PRELIMITER; _sendBufferMessage[1] = MSG_UPDATE; _sendBufferMessage[2+topicLength] = MSG_SEPARATOR; _sendBufferMessage[2+topicLength+1+dataLength] = MSG_DELIMITER; _sendBufferMessage[2+topicLength+1+dataLength+1] = '\0'; if(topicLength <= MAX_LEN_TOPICS){ for(int i = 0; i < topicLength; i++){ _sendBufferMessage[2+i] = topic[i]; } }else { _dataToSend = false; return false; } if(dataLength <= MAX_LEN_PAYLOAD){ for(int i = 0; i < dataLength; i++){ _sendBufferMessage[2+topicLength+1+i] = data[i]; } }else { _dataToSend = false; return false; } _dataToSend = true; return true; } int DidacticPSNetBroker::getTopicOrWildcardNr(char* topic){ for (int i = 0; i < MAX_NR_TOPICS_BROKER; i++) { if (strcmp(_topic[i], topic) == 0) { return i; } } return getFreeTopicNr(); } int DidacticPSNetBroker::getFreeTopicNr() { for (int i = 0; i < MAX_NR_TOPICS_BROKER; i++) { if (strcmp(_topic[i], "") == 0) { return i; } } return -1; } //************************************************************************** //LITTLE HELPERS FOR CLIENTS ;-) //************************************************************************* EdgeDetector::EdgeDetector(){}; EdgeDetector::~EdgeDetector(){}; int EdgeDetector::edgeDetected(bool currentState){ static bool lastState = false; int edEdge = 0; if(currentState && !lastState){ edEdge = RISING; } else if(!currentState && lastState){ edEdge = FALLING; } lastState = currentState; return edEdge; } ChangeDetector::ChangeDetector(){} ChangeDetector::~ChangeDetector(){} bool ChangeDetector::valueChanged(int value, int threshold){ static int lastValue = 0; if(abs(value-lastValue) > threshold){ lastValue = value; return true; } return false; } UnblockingTimer::UnblockingTimer(){} UnblockingTimer::~UnblockingTimer(){} bool UnblockingTimer::timeElapsed(long delayTime){ long currentTime = millis(); if(lastTime + (delayTime-1) < currentTime){ lastTime = currentTime; return true; } return false; } SerialReader::SerialReader(){} SerialReader::~SerialReader(){} void SerialReader::begin(Stream& rsStream){ _port = &rsStream; } int SerialReader::readSerialData(char* rsDataArray, char rsEndSign) { if (_port->available()) { char charBuffer = _port->read(); rsDataArray[charCounter] = charBuffer; if (charBuffer == rsEndSign) { rsDataArray[charCounter] = '\0'; int nrOfChars = charCounter; charCounter = 0; return nrOfChars; } else { charCounter++; } } return 0; }