/* * FreeRTOS V202212.00 * Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of * the Software, and to permit persons to whom the Software is furnished to do so, * subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. * * https://www.FreeRTOS.org * https://github.com/FreeRTOS * */ /* * Demo for showing use of the MQTT API using a mutually authenticated * network connection. * * The Example shown below uses MQTT APIs to create MQTT messages and send them * over the mutually authenticated network connection established with the * MQTT broker. This example is single threaded and uses statically allocated * memory. It uses QoS1 for sending to and receiving messages from the broker. * * A mutually authenticated TLS connection is used to connect to the * MQTT message broker in this example. Define democonfigMQTT_BROKER_ENDPOINT, * democonfigROOT_CA_PEM, democonfigCLIENT_CERTIFICATE_PEM, * and democonfigCLIENT_PRIVATE_KEY_PEM in demo_config.h to establish a * mutually authenticated connection. * * Also see https://www.freertos.org/mqtt/mqtt-agent-demo.html? for an * alternative run time model whereby coreMQTT runs in an autonomous * background agent task. Executing the MQTT protocol in an agent task * removes the need for the application writer to explicitly manage any MQTT * state or call the MQTT_ProcessLoop() API function. Using an agent task * also enables multiple application tasks to more easily share a single * MQTT connection. */ /* Standard includes. */ #include #include /* Kernel includes. */ #include "FreeRTOS.h" #include "task.h" /* Demo Specific configs. */ #include "demo_config.h" /* MQTT library includes. */ #include "core_mqtt.h" /* Exponential backoff retry include. */ #include "backoff_algorithm.h" /* Transport interface implementation include header for TLS. */ #include "transport_mbedtls.h" /*-----------------------------------------------------------*/ /* Compile time error for undefined configs. */ #ifndef democonfigMQTT_BROKER_ENDPOINT #error "Define the config democonfigMQTT_BROKER_ENDPOINT by following the instructions in file demo_config.h." #endif #ifndef democonfigROOT_CA_PEM #error "Please define Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h." #endif /* If no username is defined, then a client certificate/key is required. */ #ifndef democonfigCLIENT_USERNAME /* *!!! Please note democonfigCLIENT_PRIVATE_KEY_PEM in used for *!!! convenience of demonstration only. Production devices should *!!! store keys securely, such as within a secure element. */ #ifndef democonfigCLIENT_CERTIFICATE_PEM #error "Please define client certificate(democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h." #endif #ifndef democonfigCLIENT_PRIVATE_KEY_PEM #error "Please define client private key(democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h." #endif #else /* If a username is defined, a client password also would need to be defined for * client authentication. */ #ifndef democonfigCLIENT_PASSWORD #error "Please define client password(democonfigCLIENT_PASSWORD) in demo_config.h for client authentication based on username/password." #endif /* AWS IoT MQTT broker port needs to be 443 for client authentication based on * username/password. */ #if defined( democonfigUSE_AWS_IOT_CORE_BROKER ) && democonfigMQTT_BROKER_PORT != 443 #error "Broker port(democonfigMQTT_BROKER_PORT) should be defined as 443 in demo_config.h for client authentication based on username/password in AWS IoT Core." #endif #endif /* ifndef democonfigCLIENT_USERNAME */ /*-----------------------------------------------------------*/ /* Default values for configs. */ #ifndef democonfigCLIENT_IDENTIFIER /** * @brief The MQTT client identifier used in this example. Each client identifier * must be unique so edit as required to ensure no two clients connecting to the * same broker use the same client identifier. * * @note Appending __TIME__ to the client id string will help to create a unique * client id every time an application binary is built. Only a single instance of * this application's compiled binary may be used at a time, since the client ID * will always be the same. */ #define democonfigCLIENT_IDENTIFIER "testClient"__TIME__ #endif #ifndef democonfigMQTT_BROKER_PORT /** * @brief The port to use for the demo. */ #define democonfigMQTT_BROKER_PORT ( 8883 ) #endif /*-----------------------------------------------------------*/ /** * @brief The maximum number of retries for network operation with server. */ #define mqttexampleRETRY_MAX_ATTEMPTS ( 5U ) /** * @brief The maximum back-off delay (in milliseconds) for retrying failed operation * with server. */ #define mqttexampleRETRY_MAX_BACKOFF_DELAY_MS ( 5000U ) /** * @brief The base back-off delay (in milliseconds) to use for network operation retry * attempts. */ #define mqttexampleRETRY_BACKOFF_BASE_MS ( 500U ) /** * @brief Timeout for receiving CONNACK packet in milliseconds. */ #define mqttexampleCONNACK_RECV_TIMEOUT_MS ( 5000U ) /** * @brief The topic to subscribe and publish to in the example. * * The topic name starts with the client identifier to ensure that each demo * interacts with a unique topic name. */ #define mqttexampleTOPIC democonfigCLIENT_IDENTIFIER "/example/topic" /** * @brief The number of topic filters to subscribe. */ #define mqttexampleTOPIC_COUNT ( 1 ) /** * @brief The MQTT message published in this example. */ #define mqttexampleMESSAGE "Hello World!" /** * @brief Time in ticks to wait between each cycle of the demo implemented * by prvMQTTDemoTask(). */ #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) ) /** * @brief Timeout for MQTT_ProcessLoop in milliseconds. */ #define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 5000U ) /** * @brief Keep alive time reported to the broker while establishing * an MQTT connection. * * It is the responsibility of the Client to ensure that the interval between * Control Packets being sent does not exceed the this Keep Alive value. In the * absence of sending any other Control Packets, the Client MUST send a * PINGREQ Packet. */ #define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U ) /** * @brief Delay (in ticks) between consecutive cycles of MQTT publish operations in a * demo iteration. * * Note that the process loop also has a timeout, so the total time between * publishes is the sum of the two delays. */ #define mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ( pdMS_TO_TICKS( 2000U ) ) /** * @brief Transport timeout in milliseconds for transport send and receive. */ #define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 5000U ) /** * @brief Transport timeout in milliseconds for transport send and receive. */ #define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 200U ) /** * @brief The length of the outgoing publish records array used by the coreMQTT * library to track QoS > 0 packet ACKS for outgoing publishes. */ #define mqttexampleOUTGOING_PUBLISH_RECORD_LEN ( 10U ) /** * @brief The length of the incoming publish records array used by the coreMQTT * library to track QoS > 0 packet ACKS for incoming publishes. */ #define mqttexampleINCOMING_PUBLISH_RECORD_LEN ( 10U ) /** * Provide default values for undefined configuration settings. */ #ifndef democonfigOS_NAME #define democonfigOS_NAME "FreeRTOS" #endif #ifndef democonfigOS_VERSION #define democonfigOS_VERSION tskKERNEL_VERSION_NUMBER #endif #ifndef democonfigHARDWARE_PLATFORM_NAME #define democonfigHARDWARE_PLATFORM_NAME "WinSim" #endif #ifndef democonfigMQTT_LIB #define democonfigMQTT_LIB "core-mqtt@"MQTT_LIBRARY_VERSION #endif /** * @brief The MQTT metrics string expected by AWS IoT. */ #define AWS_IOT_METRICS_STRING \ "?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION \ "&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB /** * @brief Milliseconds per second. */ #define MILLISECONDS_PER_SECOND ( 1000U ) /** * @brief Milliseconds per FreeRTOS tick. */ #define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /*-----------------------------------------------------------*/ /** * @brief Each compilation unit that consumes the NetworkContext must define it. * It should contain a single pointer to the type of your desired transport. * When using multiple transports in the same compilation unit, define this pointer as void *. * * @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport. */ struct NetworkContext { TlsTransportParams_t * pParams; }; /*-----------------------------------------------------------*/ /** * @brief The task used to demonstrate the MQTT API. * * @param[in] pvParameters Parameters as passed at the time of task creation. Not * used in this example. */ static void prvMQTTDemoTask( void * pvParameters ); /** * @brief Connect to MQTT broker with reconnection retries. * * If connection fails, retry is attempted after a timeout. * Timeout value will exponentially increase until maximum * timeout value is reached or the number of attempts are exhausted. * * @param[out] pxNetworkContext The parameter to return the created network context. * * @return The status of the final connection attempt. */ static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkCredentials_t * pxNetworkCredentials, NetworkContext_t * pNetworkContext ); /** * @brief Sends an MQTT Connect packet over the already connected TLS over TCP connection. * * @param[in, out] pxMQTTContext MQTT context pointer. * @param[in] xNetworkContext Network context. */ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, NetworkContext_t * pxNetworkContext ); /** * @brief Function to update variable #xTopicFilterContext with status * information from Subscribe ACK. Called by the event callback after processing * an incoming SUBACK packet. * * @param[in] Server response to the subscription request. */ static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo ); /** * @brief Subscribes to the topic as specified in mqttexampleTOPIC at the top of * this file. In the case of a Subscribe ACK failure, then subscription is * retried using an exponential backoff strategy with jitter. * * @param[in] pxMQTTContext MQTT context pointer. */ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ); /** * @brief Publishes a message mqttexampleMESSAGE on mqttexampleTOPIC topic. * * @param[in] pxMQTTContext MQTT context pointer. */ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ); /** * @brief Unsubscribes from the previously subscribed topic as specified * in mqttexampleTOPIC. * * @param[in] pxMQTTContext MQTT context pointer. */ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ); /** * @brief The timer query function provided to the MQTT context. * * @return Time in milliseconds. */ static uint32_t prvGetTimeMs( void ); /** * @brief Process a response or ack to an MQTT request (PING, PUBLISH, * SUBSCRIBE or UNSUBSCRIBE). This function processes PINGRESP, PUBACK, * SUBACK, and UNSUBACK. * * @param[in] pxIncomingPacket is a pointer to structure containing deserialized * MQTT response. * @param[in] usPacketId is the packet identifier from the ack received. */ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, uint16_t usPacketId ); /** * @brief Process incoming Publish message. * * @param[in] pxPublishInfo is a pointer to structure containing deserialized * Publish message. */ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); /** * @brief The application callback function for getting the incoming publishes, * incoming acks, and ping responses reported from the MQTT library. * * @param[in] pxMQTTContext MQTT context pointer. * @param[in] pxPacketInfo Packet Info pointer for the incoming packet. * @param[in] pxDeserializedInfo Deserialized information from the incoming packet. */ static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ); /** * @brief Call #MQTT_ProcessLoop in a loop for the duration of a timeout or * #MQTT_ProcessLoop returns a failure. * * @param[in] pMqttContext MQTT context pointer. * @param[in] ulTimeoutMs Duration to call #MQTT_ProcessLoop for. * * @return Returns the return value of the last call to #MQTT_ProcessLoop. */ static MQTTStatus_t prvProcessLoopWithTimeout( MQTTContext_t * pMqttContext, uint32_t ulTimeoutMs ); /*-----------------------------------------------------------*/ /** * @brief Static buffer used to hold MQTT messages being sent and received. */ static uint8_t ucSharedBuffer[ democonfigNETWORK_BUFFER_SIZE ]; /** * @brief Global entry time into the application to use as a reference timestamp * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference * between the current time and the global entry time. This will reduce the chances * of overflow for the 32 bit unsigned integer used for holding the timestamp. */ static uint32_t ulGlobalEntryTimeMs; /** * @brief Packet Identifier generated when Publish request was sent to the broker; * it is used to match received Publish ACK to the transmitted Publish packet. */ static uint16_t usPublishPacketIdentifier; /** * @brief Packet Identifier generated when Subscribe request was sent to the broker; * it is used to match received Subscribe ACK to the transmitted Subscribe packet. */ static uint16_t usSubscribePacketIdentifier; /** * @brief Packet Identifier generated when Unsubscribe request was sent to the broker; * it is used to match received Unsubscribe response to the transmitted Unsubscribe * request. */ static uint16_t usUnsubscribePacketIdentifier; /** * @brief A pair containing a topic filter and its SUBACK status. */ typedef struct topicFilterContext { const char * pcTopicFilter; MQTTSubAckStatus_t xSubAckStatus; } topicFilterContext_t; /** * @brief An array containing the context of a SUBACK; the SUBACK status * of a filter is updated when the event callback processes a SUBACK. */ static topicFilterContext_t xTopicFilterContext[ mqttexampleTOPIC_COUNT ] = { { mqttexampleTOPIC, MQTTSubAckFailure } }; /** @brief Static buffer used to hold MQTT messages being sent and received. */ static MQTTFixedBuffer_t xBuffer = { ucSharedBuffer, democonfigNETWORK_BUFFER_SIZE }; /** * @brief Array to track the outgoing publish records for outgoing publishes * with QoS > 0. * * This is passed into #MQTT_InitStatefulQoS to allow for QoS > 0. * */ static MQTTPubAckInfo_t pOutgoingPublishRecords[ mqttexampleOUTGOING_PUBLISH_RECORD_LEN ]; /** * @brief Array to track the incoming publish records for incoming publishes * with QoS > 0. * * This is passed into #MQTT_InitStatefulQoS to allow for QoS > 0. * */ static MQTTPubAckInfo_t pIncomingPublishRecords[ mqttexampleINCOMING_PUBLISH_RECORD_LEN ]; /*-----------------------------------------------------------*/ /* * @brief Create the task that demonstrates the MQTT API Demo over a * mutually authenticated network connection with MQTT broker. */ void vStartSimpleMQTTDemo( void ) { /* This example uses a single application task, which in turn is used to * connect, subscribe, publish, unsubscribe and disconnect from the MQTT * broker. * * Also see https://www.freertos.org/mqtt/mqtt-agent-demo.html? for an * alternative run time model whereby coreMQTT runs in an autonomous * background agent task. Executing the MQTT protocol in an agent task * removes the need for the application writer to explicitly manage any MQTT * state or call the MQTT_ProcessLoop() API function. Using an agent task * also enables multiple application tasks to more easily share a single * MQTT connection. */ prvMQTTDemoTask( NULL ); } /*-----------------------------------------------------------*/ /* * @brief The Example shown below uses MQTT APIs to create MQTT messages and * send them over the mutually authenticated network connection established with the * MQTT broker. This example is single threaded and uses statically allocated * memory. It uses QoS1 for sending to and receiving messages from the broker. * * This MQTT client subscribes to the topic as specified in mqttexampleTOPIC at the * top of this file by sending a subscribe packet and then waiting for a subscribe * acknowledgment (SUBACK).This client will then publish to the same topic it * subscribed to, so it will expect all the messages it sends to the broker to be * sent back to it from the broker. */ static void prvMQTTDemoTask( void * pvParameters ) { uint32_t ulPublishCount = 0U, ulTopicCount = 0U; const uint32_t ulMaxPublishCount = 5UL; NetworkContext_t xNetworkContext = { 0 }; TlsTransportParams_t xTlsTransportParams = { 0 }; NetworkCredentials_t xNetworkCredentials = { 0 }; MQTTContext_t xMQTTContext = { 0 }; MQTTStatus_t xMQTTStatus; TlsTransportStatus_t xNetworkStatus; /* Remove compiler warnings about unused parameters. */ ( void ) pvParameters; /* Set the entry time of the demo application. This entry time will be used * to calculate relative time elapsed in the execution of the demo application, * by the timer utility function that is provided to the MQTT library. */ ulGlobalEntryTimeMs = prvGetTimeMs(); /* Set the pParams member of the network context with desired transport. */ xNetworkContext.pParams = &xTlsTransportParams; for( ; ; ) { LogInfo( ( "---------STARTING DEMO---------\r\n" ) ); /****************************** Connect. ******************************/ /* Attempt to establish TLS session with MQTT broker. If connection fails, * retry after a timeout. Timeout value will be exponentially increased * until the maximum number of attempts are reached or the maximum timeout * value is reached. The function returns a failure status if the TCP * connection cannot be established to the broker after the configured * number of attempts. */ xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkCredentials, &xNetworkContext ); configASSERT( xNetworkStatus == TLS_TRANSPORT_SUCCESS ); /* Sends an MQTT Connect packet over the already established TLS connection, * and waits for connection acknowledgment (CONNACK) packet. */ LogInfo( ( "Creating an MQTT connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext ); /**************************** Subscribe. ******************************/ /* If server rejected the subscription request, attempt to resubscribe to * topic. Attempts are made according to the exponential backoff retry * strategy implemented in BackoffAlgorithm. */ prvMQTTSubscribeWithBackoffRetries( &xMQTTContext ); /****************** Publish and Keep Alive Loop. **********************/ /* Publish messages with QoS1, send and process Keep alive messages. */ for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) { LogInfo( ( "Publish to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); prvMQTTPublishToTopic( &xMQTTContext ); /* Process incoming publish echo, since application subscribed to the * same topic, the broker will send publish message back to the * application. */ LogInfo( ( "Attempt to receive publish message from broker.\r\n" ) ); xMQTTStatus = prvProcessLoopWithTimeout( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); configASSERT( xMQTTStatus == MQTTSuccess ); /* Leave Connection Idle for some time. */ LogInfo( ( "Keeping Connection Idle...\r\n\r\n" ) ); vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES_TICKS ); } /******************** Unsubscribe from the topic. *********************/ LogInfo( ( "Unsubscribe from the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); prvMQTTUnsubscribeFromTopic( &xMQTTContext ); /* Process incoming UNSUBACK packet from the broker. */ xMQTTStatus = prvProcessLoopWithTimeout( &xMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); configASSERT( xMQTTStatus == MQTTSuccess ); /**************************** Disconnect. *****************************/ /* Send an MQTT Disconnect packet over the already connected TLS over * TCP connection. There is no corresponding response for the disconnect * packet. After sending disconnect, client must close the network * connection. */ LogInfo( ( "Disconnecting the MQTT connection with %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); xMQTTStatus = MQTT_Disconnect( &xMQTTContext ); configASSERT( xMQTTStatus == MQTTSuccess ); /* Close the network connection. */ TLS_FreeRTOS_Disconnect( &xNetworkContext ); /* Reset SUBACK status for each topic filter after completion of * subscription request cycle. */ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure; } /* Wait for some time between two iterations to ensure that we do not * bombard the broker. */ LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. " "Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) ); LogInfo( ( "Demo completed successfully.\r\n" ) ); LogInfo( ( "-------DEMO FINISHED-------\r\n" ) ); LogInfo( ( "Short delay before starting the next iteration.... \r\n\r\n" ) ); vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS_TICKS ); } } /*-----------------------------------------------------------*/ static TlsTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkCredentials_t * pxNetworkCredentials, NetworkContext_t * pxNetworkContext ) { TlsTransportStatus_t xNetworkStatus; BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; BackoffAlgorithmContext_t xReconnectParams; uint16_t usNextRetryBackOff = 0U; #ifdef democonfigUSE_AWS_IOT_CORE_BROKER #if defined( democonfigCLIENT_USERNAME ) /* * When democonfigCLIENT_USERNAME is defined, use the "mqtt" alpn to connect * to AWS IoT Core with Custom Authentication on port 443. * * Custom Authentication uses the contents of the username and password * fields of the MQTT CONNECT packet to authenticate the client. * * For more information, refer to the documentation at: * https://docs.aws.amazon.com/iot/latest/developerguide/custom-authentication.html */ static const char * ppcAlpnProtocols[] = { "mqtt", NULL }; #if democonfigMQTT_BROKER_PORT != 443U #error "Connections to AWS IoT Core with custom authentication must connect to TCP port 443 with the \"mqtt\" alpn." #endif /* democonfigMQTT_BROKER_PORT != 443U */ #else /* if !defined( democonfigCLIENT_USERNAME ) */ /* * Otherwise, use the "x-amzn-mqtt-ca" alpn to connect to AWS IoT Core using * x509 Certificate Authentication. */ static const char * ppcAlpnProtocols[] = { "x-amzn-mqtt-ca", NULL }; #endif /* !defined( democonfigCLIENT_USERNAME ) */ /* * An ALPN identifier is only required when connecting to AWS IoT core on port 443. * https://docs.aws.amazon.com/iot/latest/developerguide/protocols.html */ #if democonfigMQTT_BROKER_PORT == 443U pxNetworkCredentials->pAlpnProtos = ppcAlpnProtocols; #elif democonfigMQTT_BROKER_PORT == 8883U pxNetworkCredentials->pAlpnProtos = NULL; #else /* democonfigMQTT_BROKER_PORT != 8883U */ pxNetworkCredentials->pAlpnProtos = NULL; #error "MQTT connections to AWS IoT Core are only allowed on ports 443 and 8883." #endif /* democonfigMQTT_BROKER_PORT != 443U */ #else /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ pxNetworkCredentials->pAlpnProtos = NULL; #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ pxNetworkCredentials->disableSni = democonfigDISABLE_SNI; /* Set the credentials for establishing a TLS connection. */ pxNetworkCredentials->pRootCa = ( const unsigned char * ) democonfigROOT_CA_PEM; pxNetworkCredentials->rootCaSize = sizeof( democonfigROOT_CA_PEM ); #ifdef democonfigCLIENT_CERTIFICATE_PEM pxNetworkCredentials->pClientCert = ( const unsigned char * ) democonfigCLIENT_CERTIFICATE_PEM; pxNetworkCredentials->clientCertSize = sizeof( democonfigCLIENT_CERTIFICATE_PEM ); pxNetworkCredentials->pPrivateKey = ( const unsigned char * ) democonfigCLIENT_PRIVATE_KEY_PEM; pxNetworkCredentials->privateKeySize = sizeof( democonfigCLIENT_PRIVATE_KEY_PEM ); #endif /* Initialize reconnect attempts and interval. */ BackoffAlgorithm_InitializeParams( &xReconnectParams, mqttexampleRETRY_BACKOFF_BASE_MS, mqttexampleRETRY_MAX_BACKOFF_DELAY_MS, mqttexampleRETRY_MAX_ATTEMPTS ); /* Attempt to connect to MQTT broker. If connection fails, retry after * a timeout. Timeout value will exponentially increase till maximum * attempts are reached. */ do { /* Establish a TLS session with the MQTT broker. This example connects to * the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and * democonfigMQTT_BROKER_PORT at the top of this file. */ LogInfo( ( "Creating a TLS connection to %s:%u.\r\n", democonfigMQTT_BROKER_ENDPOINT, democonfigMQTT_BROKER_PORT ) ); /* Attempt to create a mutually authenticated TLS connection. */ xNetworkStatus = TLS_FreeRTOS_Connect( pxNetworkContext, democonfigMQTT_BROKER_ENDPOINT, democonfigMQTT_BROKER_PORT, pxNetworkCredentials, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ); if( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) { /* Generate a random number and calculate backoff value (in milliseconds) for * the next connection retry. * Note: It is recommended to seed the random number generator with a device-specific * entropy source so that possibility of multiple devices retrying failed network operations * at similar intervals can be avoided. */ xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, uxRand(), &usNextRetryBackOff ); if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) { LogError( ( "Connection to the broker failed, all attempts exhausted." ) ); } else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) { LogWarn( ( "Connection to the broker failed. " "Retrying connection with backoff and jitter." ) ); vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); } } } while( ( xNetworkStatus != TLS_TRANSPORT_SUCCESS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) ); return xNetworkStatus; } /*-----------------------------------------------------------*/ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, NetworkContext_t * pxNetworkContext ) { MQTTStatus_t xResult; MQTTConnectInfo_t xConnectInfo; bool xSessionPresent; TransportInterface_t xTransport; /*** * For readability, error handling in this function is restricted to the use of * asserts(). ***/ /* Fill in Transport Interface send and receive function pointers. */ xTransport.pNetworkContext = pxNetworkContext; xTransport.send = TLS_FreeRTOS_send; xTransport.recv = TLS_FreeRTOS_recv; xTransport.writev = NULL; /* Initialize MQTT library. */ xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer ); configASSERT( xResult == MQTTSuccess ); xResult = MQTT_InitStatefulQoS( pxMQTTContext, pOutgoingPublishRecords, mqttexampleOUTGOING_PUBLISH_RECORD_LEN, pIncomingPublishRecords, mqttexampleINCOMING_PUBLISH_RECORD_LEN ); configASSERT( xResult == MQTTSuccess ); /* Some fields are not used in this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); /* Start with a clean session i.e. direct the MQTT broker to discard any * previous session data. Also, establishing a connection with clean session * will ensure that the broker does not store any data when this client * gets disconnected. */ xConnectInfo.cleanSession = true; /* The client identifier is used to uniquely identify this MQTT client to * the MQTT broker. In a production device the identifier can be something * unique, such as a device serial number. */ xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); /* Set MQTT keep-alive period. If the application does not send packets at an interval less than * the keep-alive period, the MQTT library will send PINGREQ packets. */ xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS; /* Append metrics when connecting to the AWS IoT Core broker. */ #ifdef democonfigUSE_AWS_IOT_CORE_BROKER #if defined( democonfigCLIENT_USERNAME ) /* Append metrics string when connecting to AWS IoT Core with custom auth */ xConnectInfo.pUserName = democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING; xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING ); /* Use the provided password as-is */ xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); #else /* If no username is needed, only send the metrics string */ xConnectInfo.pUserName = AWS_IOT_METRICS_STRING; xConnectInfo.userNameLength = ( uint16_t ) strlen( AWS_IOT_METRICS_STRING ); /* Password for authentication is not used. */ xConnectInfo.pPassword = NULL; xConnectInfo.passwordLength = 0U; #endif /* defined( democonfigCLIENT_USERNAME ) */ #else /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ #ifdef democonfigCLIENT_USERNAME xConnectInfo.pUserName = democonfigCLIENT_USERNAME; xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME ); xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); #endif /* ifdef democonfigCLIENT_USERNAME */ #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it * is passed as NULL. */ xResult = MQTT_Connect( pxMQTTContext, &xConnectInfo, NULL, mqttexampleCONNACK_RECV_TIMEOUT_MS, &xSessionPresent ); configASSERT( xResult == MQTTSuccess ); /* Successfully established and MQTT connection with the broker. */ LogInfo( ( "An MQTT connection is established with %s.", democonfigMQTT_BROKER_ENDPOINT ) ); } /*-----------------------------------------------------------*/ static void prvUpdateSubAckStatus( MQTTPacketInfo_t * pxPacketInfo ) { MQTTStatus_t xResult = MQTTSuccess; uint8_t * pucPayload = NULL; size_t ulSize = 0; uint32_t ulTopicCount = 0U; xResult = MQTT_GetSubAckStatusCodes( pxPacketInfo, &pucPayload, &ulSize ); /* MQTT_GetSubAckStatusCodes always returns success if called with packet info * from the event callback and non-NULL parameters. */ configASSERT( xResult == MQTTSuccess ); for( ulTopicCount = 0; ulTopicCount < ulSize; ulTopicCount++ ) { xTopicFilterContext[ ulTopicCount ].xSubAckStatus = pucPayload[ ulTopicCount ]; } } /*-----------------------------------------------------------*/ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult = MQTTSuccess; BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess; BackoffAlgorithmContext_t xRetryParams; uint16_t usNextRetryBackOff = 0U; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; bool xFailedSubscribeToTopic = false; uint32_t ulTopicCount = 0U; /* Some fields not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Get a unique packet id. */ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to * only one topic and uses QoS1. */ xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); /* Initialize context for backoff retry attempts if SUBSCRIBE request fails. */ BackoffAlgorithm_InitializeParams( &xRetryParams, mqttexampleRETRY_BACKOFF_BASE_MS, mqttexampleRETRY_MAX_BACKOFF_DELAY_MS, mqttexampleRETRY_MAX_ATTEMPTS ); do { /* The client is now connected to the broker. Subscribe to the topic * as specified in mqttexampleTOPIC at the top of this file by sending a * subscribe packet then waiting for a subscribe acknowledgment (SUBACK). * This client will then publish to the same topic it subscribed to, so it * will expect all the messages it sends to the broker to be sent back to it * from the broker. This demo uses QOS0 in Subscribe, therefore, the Publish * messages received from the broker will have QOS0. */ LogInfo( ( "Attempt to subscribe to the MQTT topic %s.\r\n", mqttexampleTOPIC ) ); xResult = MQTT_Subscribe( pxMQTTContext, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), usSubscribePacketIdentifier ); configASSERT( xResult == MQTTSuccess ); LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) ); /* Process incoming packet from the broker. After sending the subscribe, the * client may receive a publish before it receives a subscribe ack. Therefore, * call generic incoming packet processing function. Since this demo is * subscribing to the topic to which no one is publishing, probability of * receiving Publish message before subscribe ack is zero; but application * must be ready to receive any packet. This demo uses the generic packet * processing function everywhere to highlight this fact. */ xResult = prvProcessLoopWithTimeout( pxMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); configASSERT( xResult == MQTTSuccess ); /* Reset flag before checking suback responses. */ xFailedSubscribeToTopic = false; /* Check if recent subscription request has been rejected. #xTopicFilterContext is updated * in the event callback to reflect the status of the SUBACK sent by the broker. It represents * either the QoS level granted by the server upon subscription, or acknowledgement of * server rejection of the subscription request. */ for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure ) { xFailedSubscribeToTopic = true; /* Generate a random number and calculate backoff value (in milliseconds) for * the next connection retry. * Note: It is recommended to seed the random number generator with a device-specific * entropy source so that possibility of multiple devices retrying failed network operations * at similar intervals can be avoided. */ xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xRetryParams, uxRand(), &usNextRetryBackOff ); if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted ) { LogError( ( "Server rejected subscription request. All retry attempts have exhausted. Topic=%s", xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) ); } else if( xBackoffAlgStatus == BackoffAlgorithmSuccess ) { LogWarn( ( "Server rejected subscription request. Attempting to re-subscribe to topic %s.", xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) ); /* Backoff before the next re-subscribe attempt. */ vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) ); } break; } } configASSERT( xBackoffAlgStatus != BackoffAlgorithmRetriesExhausted ); } while( ( xFailedSubscribeToTopic == true ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) ); } /*-----------------------------------------------------------*/ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTPublishInfo_t xMQTTPublishInfo; /*** * For readability, error handling in this function is restricted to the use of * asserts(). ***/ /* Some fields are not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); /* This demo uses QoS1. */ xMQTTPublishInfo.qos = MQTTQoS1; xMQTTPublishInfo.retain = false; xMQTTPublishInfo.pTopicName = mqttexampleTOPIC; xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); /* Get a unique packet id. */ usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Send PUBLISH packet. Packet ID is not used for a QoS1 publish. */ xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier ); configASSERT( xResult == MQTTSuccess ); } /*-----------------------------------------------------------*/ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; /* Some fields not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); /* Get a unique packet id. */ usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to * only one topic and uses QoS1. */ xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); /* Get next unique packet identifier. */ usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Send UNSUBSCRIBE packet. */ xResult = MQTT_Unsubscribe( pxMQTTContext, xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), usUnsubscribePacketIdentifier ); configASSERT( xResult == MQTTSuccess ); } /*-----------------------------------------------------------*/ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, uint16_t usPacketId ) { uint32_t ulTopicCount = 0U; switch( pxIncomingPacket->type ) { case MQTT_PACKET_TYPE_PUBACK: LogInfo( ( "PUBACK received for packet Id %u.\r\n", usPacketId ) ); /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usPublishPacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_SUBACK: /* A SUBACK from the broker, containing the server response to our subscription request, has been received. * It contains the status code indicating server approval/rejection for the subscription to the single topic * requested. The SUBACK will be parsed to obtain the status code, and this status code will be stored in global * variable #xTopicFilterContext. */ prvUpdateSubAckStatus( pxIncomingPacket ); for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ ) { if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus != MQTTSubAckFailure ) { LogInfo( ( "Subscribed to the topic %s with maximum QoS %u.\r\n", xTopicFilterContext[ ulTopicCount ].pcTopicFilter, xTopicFilterContext[ ulTopicCount ].xSubAckStatus ) ); } } /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usSubscribePacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_UNSUBACK: LogInfo( ( "Unsubscribed from the topic %s.\r\n", mqttexampleTOPIC ) ); /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usUnsubscribePacketIdentifier == usPacketId ); break; case MQTT_PACKET_TYPE_PINGRESP: /* Nothing to be done from application as library handles * PINGRESP with the use of MQTT_ProcessLoop API function. */ LogWarn( ( "PINGRESP should not be handled by the application " "callback when using MQTT_ProcessLoop.\n" ) ); break; /* Any other packet type is invalid. */ default: LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).\r\n", pxIncomingPacket->type ) ); } } /*-----------------------------------------------------------*/ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) { configASSERT( pxPublishInfo != NULL ); /* Process incoming Publish. */ LogInfo( ( "Incoming QoS : %d\n", pxPublishInfo->qos ) ); /* Verify the received publish is for the we have subscribed to. */ if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) && ( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) ) { LogInfo( ( "\r\nIncoming Publish Topic Name: %.*s matches subscribed topic.\r\n" "Incoming Publish Message : %.*s\r\n", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName, pxPublishInfo->payloadLength, pxPublishInfo->pPayload ) ); } else { LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName ) ); } } /*-----------------------------------------------------------*/ static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ) { /* The MQTT context is not used for this demo. */ ( void ) pxMQTTContext; if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH ) { prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo ); } else { prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier ); } } /*-----------------------------------------------------------*/ static uint32_t prvGetTimeMs( void ) { TickType_t xTickCount = 0; uint32_t ulTimeMs = 0UL; /* Get the current tick count. */ xTickCount = xTaskGetTickCount(); /* Convert the ticks to milliseconds. */ ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK; /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the * elapsed time in the application. */ ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs ); return ulTimeMs; } /*-----------------------------------------------------------*/ static MQTTStatus_t prvProcessLoopWithTimeout( MQTTContext_t * pMqttContext, uint32_t ulTimeoutMs ) { uint32_t ulMqttProcessLoopTimeoutTime; uint32_t ulCurrentTime; MQTTStatus_t eMqttStatus = MQTTSuccess; ulCurrentTime = pMqttContext->getTime(); ulMqttProcessLoopTimeoutTime = ulCurrentTime + ulTimeoutMs; /* Call MQTT_ProcessLoop multiple times a timeout happens, or * MQTT_ProcessLoop fails. */ while( ( ulCurrentTime < ulMqttProcessLoopTimeoutTime ) && ( eMqttStatus == MQTTSuccess || eMqttStatus == MQTTNeedMoreBytes ) ) { eMqttStatus = MQTT_ProcessLoop( pMqttContext ); ulCurrentTime = pMqttContext->getTime(); } if( eMqttStatus == MQTTNeedMoreBytes ) { eMqttStatus = MQTTSuccess; } return eMqttStatus; } /*-----------------------------------------------------------*/