diff --git a/.gitmodules b/.gitmodules index 393784b6..762fbcbd 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,3 +2,6 @@ [submodule "dev/OpenOCD"] path = dev/OpenOCD url = https://github.com/STMicroelectronics/OpenOCD.git +[submodule "dev/nanopb/nanopb"] + path = dev/nanopb/nanopb + url = https://github.com/nanopb/nanopb.git diff --git a/NetX/inc/u_nx_ethernet.h b/NetX/inc/u_nx_ethernet.h index 95afd7e9..07f8f46c 100644 --- a/NetX/inc/u_nx_ethernet.h +++ b/NetX/inc/u_nx_ethernet.h @@ -17,6 +17,11 @@ #define ETH_MESSAGE_SIZE 128 /* Maximum ethernet message size in bytes. */ #define ETH_MAX_PACKETS 10 /* Maximum number of packets we wanna handle simultaneously */ #define ETH_NUMBER_OF_NODES 8 /* Number of nodes in the network. */ +#define ETH_ENABLE_MANUAL_UDP_MULTICAST 0 // whether to enable UDP multicast +#define ETH_ENABLE_IGMP 1 // whether to enable IGMP +#define ETH_ENABLE_MQTT 1 // whether to use a MQTT connection +#define ETH_MQTT_SERVER_IP IP_ADDRESS(10,0,0,1) // the server address of the broker (TPU usually) +#define ETH_MQTT_SERVER_PORT 1883 typedef enum { TPU = (1 << 0), // 0b00000001 @@ -61,7 +66,12 @@ typedef struct __attribute__((__packed__)) { /* Function Pointers (for initialization). */ typedef void (*DriverFunction)(NX_IP_DRIVER *); /* User-supplied network driver used to send and receive IP packets. */ + +#if ETH_ENABLE_MANUAL_UDP_MULTICAST +typedef void (*OnRecieve)(ethernet_message_t message); /* User-supplied function that will be called whenever an ethernet message is recieved. */ +#else typedef void (*OnRecieve)(ethernet_message_t message); /* User-supplied function that will be called whenever an ethernet message is recieved. */ +#endif /** * @brief Initializes the NetX ethernet system in a repo. @@ -70,8 +80,9 @@ typedef void (*OnRecieve)(ethernet_message_t message); /* User-supplied function * @param on_recieve User-supplied function to be called whenever an ethernet message is recieved. The function's only parameter is an ethernet_message_t instance containing the recieved message. * @return Status. */ -uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve); +UINT ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve); +#if ETH_ENABLE_MANUAL_UDP_MULTICAST /** * @brief Creates an ethernet message. Can be send with ethernet_send_message(), or added to a queue. * @param recipient_id The ID of the recipient node. @@ -88,6 +99,35 @@ ethernet_message_t ethernet_create_message(uint8_t message_id, ethernet_node_t r * @return Status. */ uint8_t ethernet_send_message(ethernet_message_t *message); +#endif + +#if ETH_ENABLE_MQTT +/** + * @brief Sends a MQTT message to outgoing queue + * @param topic_name The topic name + * @param topic_size The topic size in bytes + * @param message The data to send + * @param message_size The message size in bytes + * @return The error code. + */ +UINT ethernet_mqtt_publish(char* topic_name, UINT topic_size, char* message, UINT message_size); + +/** + * Connect to a disconnected MQTT server (NX_MQTT_NOT_CONNECTED) + * Will yield while trying to connect + */ +UINT ethernet_mqtt_reconnect(void); + +/** + * + */ +//ethernet_mqtt_subscribe(); + +/** + * + */ + +#endif /** * @brief Retrieves the time from PTP stack. @@ -95,5 +135,10 @@ uint8_t ethernet_send_message(ethernet_message_t *message); */ NX_PTP_DATE_TIME ethernet_get_time(void); +/** + * Debugging, print the status of ARP statistics + */ +UINT ethernent_print_arp_status(void); + // clang-format on #endif /* u_nx_ethernet.h */ diff --git a/NetX/inc/u_nx_protobuf.h b/NetX/inc/u_nx_protobuf.h new file mode 100644 index 00000000..360b9bf6 --- /dev/null +++ b/NetX/inc/u_nx_protobuf.h @@ -0,0 +1,83 @@ +#pragma once + +// clang-format off + +/* +* Wrapper for structuring and sending protobuf Ethernet messages over MQTT. +* This API sits above the lower-level u_nx_ethernet.h driver layer. Messages with nonstandard formats can still be sent using the u_nx_ethernet.h API directly. +*/ + +#include +#include +#include "serverdata.pb.h" + +/* Helper macros. */ +#define PB_STR_HELPER(x) #x // Helper for PB_TOSTR(). Probably should never use directly. +#define PB_TOSTR(x) PB_STR_HELPER(x) // Converts a macro's value into a string. +#define PB_COUNT_ARGS(...) (sizeof((float[]){ __VA_ARGS__ }) / sizeof(float)) // Returns the number of arguments passed into it. +#define PB_STR_LEN(s) (sizeof(s) - 1) // Returns the length of a string literal. + +/* CONFIG: Compile-time validation of topic size, unit size, and number of values. */ +#define PB_MAX_TOPIC_LENGTH 100 // Maximum +#define PB_MAX_UNIT_LENGTH 15 +#define PB_MIN_DATAPOINTS 1 +#define PB_MAX_DATAPOINTS 5 +#define PB_VALIDATE_ARGS(topic, unit, num_values) \ + do { \ + _Static_assert( \ + PB_STR_LEN(topic) <= PB_MAX_TOPIC_LENGTH, \ + "MQTT topic parameter exceeds maximum length of " PB_TOSTR(PB_MAX_TOPIC_LENGTH) " allowed by `nx_protobuf_mqtt_message_create()`."\ + ); \ + _Static_assert( \ + PB_STR_LEN(unit) <= PB_MAX_UNIT_LENGTH, \ + "MQTT unit parameter exceeds maximum length of " PB_TOSTR(PB_MAX_UNIT_LENGTH) " allowed by `nx_protobuf_mqtt_message_create()`." \ + ); \ + _Static_assert( \ + (num_values) >= PB_MIN_DATAPOINTS, \ + "Must pass at least " PB_TOSTR(PB_MIN_DATAPOINTS) " value into the variable argument of `nx_protobuf_mqtt_message_create()`." \ + ); \ + _Static_assert( \ + (num_values) <= PB_MAX_DATAPOINTS, \ + "Cannot pass more than " PB_TOSTR(PB_MAX_DATAPOINTS) " values into the variable argument of `nx_protobuf_mqtt_message_create()`." \ + ); \ + } while (0) + +/** + * @brief Creates and formats a `ethernet_mqtt_message_t` object, and returns it to the caller. + * @param topic (const char*) String literal representing the message's MQTT topic name. + * @param unit (const char*) String literal representing the unit of the message's data. + * @param ... (float) The data to be sent in the message. This is a variable argument, so it can be repeated depending on how many datapoints you want to send. If you pass in more datapoints than allowed, you will get a compile-time error. + * @return An `ethernet_mqtt_message_t` object. + * @note If message creation was not completed for any reason, .initialized will be false in the returned `ethernet_mqtt_message_t` object. You may still use the object as you please (including attempting to initialize it again), but attempting to send the message (via `nx_protobuf_mqtt_message_send()`) will return an error. + */ +#define nx_protobuf_mqtt_message_create(topic, unit, ...) \ + ({ \ + PB_VALIDATE_ARGS(topic, unit, PB_COUNT_ARGS(__VA_ARGS__)); \ + _nx_protobuf_mqtt_message_create( \ + (topic), PB_STR_LEN(topic), \ + (unit), PB_STR_LEN(unit), \ + (float[]){ __VA_ARGS__ }, \ + PB_COUNT_ARGS(__VA_ARGS__) \ + ); \ + }) + + +/* Ethernet MQTT Message. */ +typedef struct { + const char* topic; + int topic_size; + serverdata_v2_ServerData protobuf; + bool initialized; +} ethernet_mqtt_message_t; + +/** + * @brief Dispatches a `ethernet_mqtt_message_t` message over MQTT. + * @param message The message to send. + * @return U_SUCCESS if successful, U_ERROR is not successful. + */ +int nx_protobuf_mqtt_message_send(ethernet_mqtt_message_t* message); + +/* MACRO IMPLEMENTATIONS */ +ethernet_mqtt_message_t _nx_protobuf_mqtt_message_create(const char* topic, size_t topic_size, const char* unit, size_t unit_len, const float values[], int values_count); + +// clang-format on \ No newline at end of file diff --git a/NetX/src/u_nx_debug.c b/NetX/src/u_nx_debug.c index 25808ed8..4add549d 100644 --- a/NetX/src/u_nx_debug.c +++ b/NetX/src/u_nx_debug.c @@ -1,4 +1,16 @@ #include "u_nx_debug.h" +#include "nxd_ptp_client.h" + +#if defined(__has_include) +#if __has_include("nxd_mqtt_client.h") +#include "nxd_mqtt_client.h" +#define U_NX_DEBUG_HAS_MQTT 1 +#endif +#endif + +#ifndef U_NX_DEBUG_HAS_MQTT +#define U_NX_DEBUG_HAS_MQTT 0 +#endif // clang-format off @@ -72,6 +84,45 @@ const char* nx_status_toString(UINT status) { case NX_OPTION_HEADER_ERROR: return "NX_OPTION_HEADER_ERROR"; case NX_CONTINUE: return "NX_CONTINUE"; case NX_TCPIP_OFFLOAD_ERROR: return "NX_TCPIP_OFFLOAD_ERROR"; + + /* MQTT-specific stuff. */ +#if U_NX_DEBUG_HAS_MQTT +#if (NXD_MQTT_SUCCESS != NX_SUCCESS) + case NXD_MQTT_SUCCESS: return "NXD_MQTT_SUCCESS"; +#endif + case NXD_MQTT_ALREADY_CONNECTED: return "NXD_MQTT_ALREADY_CONNECTED"; + case NXD_MQTT_NOT_CONNECTED: return "NXD_MQTT_NOT_CONNECTED"; + case NXD_MQTT_MUTEX_FAILURE: return "NXD_MQTT_MUTEX_FAILURE"; + case NXD_MQTT_INTERNAL_ERROR: return "NXD_MQTT_INTERNAL_ERROR"; + case NXD_MQTT_CONNECT_FAILURE: return "NXD_MQTT_CONNECT_FAILURE"; + case NXD_MQTT_PACKET_POOL_FAILURE: return "NXD_MQTT_PACKET_POOL_FAILURE"; + case NXD_MQTT_COMMUNICATION_FAILURE: return "NXD_MQTT_COMMUNICATION_FAILURE"; + case NXD_MQTT_SERVER_MESSAGE_FAILURE: return "NXD_MQTT_SERVER_MESSAGE_FAILURE"; + case NXD_MQTT_INVALID_PARAMETER: return "NXD_MQTT_INVALID_PARAMETER"; + case NXD_MQTT_NO_MESSAGE: return "NXD_MQTT_NO_MESSAGE"; + case NXD_MQTT_PACKET_POOL_EMPTY: return "NXD_MQTT_PACKET_POOL_EMPTY"; + case NXD_MQTT_QOS2_NOT_SUPPORTED: return "NXD_MQTT_QOS2_NOT_SUPPORTED"; + case NXD_MQTT_INSUFFICIENT_BUFFER_SPACE: return "NXD_MQTT_INSUFFICIENT_BUFFER_SPACE"; + case NXD_MQTT_CLIENT_NOT_RUNNING: return "NXD_MQTT_CLIENT_NOT_RUNNING"; + case NXD_MQTT_INVALID_PACKET: return "NXD_MQTT_INVALID_PACKET"; + case NXD_MQTT_PARTIAL_PACKET: return "NXD_MQTT_PARTIAL_PACKET"; + case NXD_MQTT_CONNECTING: return "NXD_MQTT_CONNECTING"; + case NXD_MQTT_INVALID_STATE: return "NXD_MQTT_INVALID_STATE"; + case NXD_MQTT_ERROR_CONNECT_RETURN_CODE: return "NXD_MQTT_ERROR_CONNECT_RETURN_CODE"; + case NXD_MQTT_ERROR_UNACCEPTABLE_PROTOCOL: return "NXD_MQTT_ERROR_UNACCEPTABLE_PROTOCOL"; + case NXD_MQTT_ERROR_IDENTIFYIER_REJECTED: return "NXD_MQTT_ERROR_IDENTIFYIER_REJECTED"; + case NXD_MQTT_ERROR_SERVER_UNAVAILABLE: return "NXD_MQTT_ERROR_SERVER_UNAVAILABLE"; + case NXD_MQTT_ERROR_BAD_USERNAME_PASSWORD: return "NXD_MQTT_ERROR_BAD_USERNAME_PASSWORD"; + case NXD_MQTT_ERROR_NOT_AUTHORIZED: return "NXD_MQTT_ERROR_NOT_AUTHORIZED"; +#endif + + /* PTP-specific stuff. */ + case NX_PTP_CLIENT_NOT_STARTED: return "NX_PTP_CLIENT_NOT_STARTED"; + case NX_PTP_CLIENT_ALREADY_STARTED: return "NX_PTP_CLIENT_ALREADY_STARTED"; + case NX_PTP_PARAM_ERROR: return "NX_PTP_PARAM_ERROR"; + case NX_PTP_CLIENT_INSUFFICIENT_PACKET_PAYLOAD: return "NX_PTP_CLIENT_INSUFFICIENT_PACKET_PAYLOAD"; + case NX_PTP_CLIENT_CLOCK_CALLBACK_FAILURE: return "NX_PTP_CLIENT_CLOCK_CALLBACK_FAILURE"; + default: return "UNKNOWN_STATUS"; } } diff --git a/NetX/src/u_nx_ethernet.c b/NetX/src/u_nx_ethernet.c index d1a6ee38..f5474029 100644 --- a/NetX/src/u_nx_ethernet.c +++ b/NetX/src/u_nx_ethernet.c @@ -2,49 +2,83 @@ #include "u_nx_ethernet.h" #include "nx_stm32_eth_driver.h" #include "nxd_ptp_client.h" +#include "tx_api.h" #include "u_nx_debug.h" #include "u_tx_debug.h" #include "c_utils.h" #include "nx_api.h" +#include "u_tx_general.h" #include #include +#include +#if ETH_ENABLE_MQTT +#include "nxd_mqtt_client.h" +#endif /* PRIVATE MACROS */ -#define _IP_THREAD_STACK_SIZE 2048 -#define _ARP_CACHE_SIZE 1024 +#define _IP_THREAD_STACK_SIZE 4096 +#define _ARP_CACHE_SIZE 2048 #define _IP_THREAD_PRIORITY 1 #define _IP_NETWORK_MASK IP_ADDRESS(255, 255, 255, 0) #define _UDP_QUEUE_MAXIMUM 12 #define _PTP_THREAD_PRIORITY 2 +#define _MQTT_THREAD_PRIORITY 2 /* The DEFAULT_PAYLOAD_SIZE should match with RxBuffLen configured via MX_ETH_Init */ #define DEFAULT_PAYLOAD_SIZE 1524 -#define NX_APP_PACKET_POOL_SIZE ((DEFAULT_PAYLOAD_SIZE + sizeof(NX_PACKET)) * 10) +#define NX_APP_PACKET_POOL_SIZE ((DEFAULT_PAYLOAD_SIZE + sizeof(NX_PACKET)) * 100) +#define MQTT_CLIENT_STACK_SIZE 8192 + +extern ETH_HandleTypeDef heth; /* DEVICE INFO */ typedef struct { - /* NetX Objects */ - NX_UDP_SOCKET socket; - NX_PACKET_POOL packet_pool; + uint8_t node_id; + NX_IP ip; - NX_PTP_CLIENT ptp_client; - SHORT ptp_utc_offset; + UCHAR ip_memory[_IP_THREAD_STACK_SIZE]; + DriverFunction + driver; /* Set by the user. Used to communicate with the driver layer. */ + OnRecieve on_recieve; /* Set by the user. Called when a message is recieved. */ - /* Static memory for NetX stuff */ + NX_PACKET_POOL packet_pool; UCHAR packet_pool_memory[NX_APP_PACKET_POOL_SIZE]; - UCHAR ip_memory[_IP_THREAD_STACK_SIZE]; + UCHAR arp_cache_memory[_ARP_CACHE_SIZE]; + + #if ETH_ENABLE_MANUAL_UDP_MULTICAST + NX_UDP_SOCKET socket; + #endif + + #if ETH_ENABLE_MQTT + NXD_MQTT_CLIENT mqtt_client; + UCHAR mqtt_thread_stack[MQTT_CLIENT_STACK_SIZE / sizeof(ULONG)]; + #endif + + + NX_PTP_CLIENT ptp_client; + SHORT ptp_utc_offset; ULONG ptp_stack[2048 / sizeof(ULONG)]; - /* Device config variables */ bool is_initialized; - uint8_t node_id; - DriverFunction - driver; /* Set by the user. Used to communicate with the driver layer. */ - OnRecieve on_recieve; /* Set by the user. Called when a message is recieved. */ } _ethernet_device_t; static _ethernet_device_t device = { 0 }; +/* CALLBACK FUNCTIONS */ +#if ETH_ENABLE_MQTT +static VOID _mqtt_disconnect_callback(NXD_MQTT_CLIENT *client_ptr) +{ + PRINTLN_WARNING("client disconnected from server\n"); +} + +static VOID _mqtt_recieve_callback(NXD_MQTT_CLIENT* client_ptr, UINT number_of_messages) +{ + //tx_event_flags_set(&mqtt_app_flag, DEMO_MESSAGE_EVENT, TX_OR); + return; +} +#endif + + /* Callback function. Called when a PTP event is processed. */ // extern UINT ptp_clock_callback(NX_PTP_CLIENT *client_ptr, UINT operation, // NX_PTP_TIME *time_ptr, NX_PACKET *packet_ptr, @@ -85,6 +119,7 @@ static UINT _ptp_event_callback(NX_PTP_CLIENT *ptp_client_ptr, UINT event, VOID } /* Callback function. Called when an ethernet message is received. */ +#if ETH_ENABLE_MANUAL_UDP_MULTICAST static void _receive_message(NX_UDP_SOCKET *socket) { NX_PACKET *packet; ULONG bytes_copied; @@ -130,12 +165,13 @@ static void _receive_message(NX_UDP_SOCKET *socket) { /* Release the packet */ nx_packet_release(packet); } +#endif /* API FUNCTIONS */ -uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve) { +UINT ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve on_recieve) { - uint8_t status; + UINT status; device.ptp_utc_offset = 0; // no offset to start /* Make sure device isn't already initialized */ @@ -166,8 +202,8 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve status = nx_ip_create( &device.ip, // Pointer to the IP instance "Ethernet IP Instance", // Name - IP_ADDRESS(5, 5, 5, device.node_id), // Dummy unicast IP (we shouldn't have to use this if we're just using multicast for everything) - _IP_NETWORK_MASK, // Network mask + IP_ADDRESS(10, 0, 0, device.node_id), // Unicast IP derived from node_id, 10.0.0.0/24 + _IP_NETWORK_MASK, // Network mask /24 &device.packet_pool, // Pointer to the packet pool device.driver, // Pointer to the Ethernet driver function device.ip_memory, // Pointer to the memory for the IP instance @@ -179,6 +215,15 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve return status; } + // behold the magic + // this is a poll function to wait until the driver has completely initialized + // without it the arp module and the TCP module fail silently and confusingly + ULONG current_status; + do { + nx_ip_driver_direct_command(&device.ip, 51, ¤t_status); + tx_thread_sleep(100); + } while (current_status != 4);// NX_DRIVER_STATE_LINK_ENABLED + /* Enable ARP */ status = nx_arp_enable( &device.ip, // IP instance @@ -191,6 +236,21 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve } + /* Enable igmp */ +#if ETH_ENABLE_IGMP || ETH_ENABLE_MANUAL_UDP_MULTICAST + status = nx_igmp_enable(&device.ip); + if (status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to enable igmp (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } +#endif + + status = nx_icmp_enable(&device.ip); + if (status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to enable icmp (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + /* Enable UDP */ status = nx_udp_enable(&device.ip); if (status != NX_SUCCESS) { @@ -198,13 +258,13 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve return status; } - /* Enable igmp */ - status = nx_igmp_enable(&device.ip); + status = nx_tcp_enable(&device.ip); if (status != NX_SUCCESS) { - PRINTLN_ERROR("Failed to enable igmp (Status: %d/%s).", status, nx_status_toString(status)); + PRINTLN_ERROR("Failed to enable TCP (Status: %d/%s).", status, nx_status_toString(status)); return status; } +#if ETH_ENABLE_MANUAL_UDP_MULTICAST /* Set up multicast groups. * (This iterates through every possible node combination between 0b00000001 and 0b11111111. * If any of the combinations include device.node_id, that combination gets added as a multicast group. @@ -224,22 +284,6 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve } } - /* Create the PTP client instance */ - status = nx_ptp_client_create(&device.ptp_client, &device.ip, 0, &device.packet_pool, - _PTP_THREAD_PRIORITY, (UCHAR *)&device.ptp_stack, sizeof(device.ptp_stack), - _nx_ptp_client_soft_clock_callback, NX_NULL); - if(status != NX_SUCCESS) { - PRINTLN_ERROR("Failed to create PTP client (Status: %d/%s).", status, nx_status_toString(status)); - return status; - } - - /* start the PTP client */ - status = nx_ptp_client_start(&device.ptp_client, NX_NULL, 0, 0, 0, _ptp_event_callback, NX_NULL); - if(status != NX_SUCCESS) { - PRINTLN_ERROR("Failed to start PTP client (Status: %d/%s).", status, nx_status_toString(status)); - return status; - } - /* Create UDP socket for broadcasting */ status = nx_udp_socket_create( &device.ip, // IP instance @@ -278,15 +322,72 @@ uint8_t ethernet_init(ethernet_node_t node_id, DriverFunction driver, OnRecieve nx_udp_socket_delete(&device.socket); return status; } +#endif + + /* Create the PTP client instance */ + status = nx_ptp_client_create(&device.ptp_client, &device.ip, 0, &device.packet_pool, + _PTP_THREAD_PRIORITY, (UCHAR *)&device.ptp_stack, sizeof(device.ptp_stack), + _nx_ptp_client_soft_clock_callback, NX_NULL); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to create PTP client (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + /* start the PTP client */ + status = nx_ptp_client_start(&device.ptp_client, NX_NULL, 0, 0, 0, _ptp_event_callback, NX_NULL); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to start PTP client (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + +#if ETH_ENABLE_MQTT +/* Create MQTT client instance. */ + char client_id[8] = ""; + UINT client_id_size = sprintf(client_id, "FWD-%d", device.node_id); + status = nxd_mqtt_client_create(&device.mqtt_client, "MQTT client", + client_id, client_id_size, &device.ip, &device.packet_pool, + (VOID*)device.mqtt_thread_stack, sizeof(device.mqtt_thread_stack), + _MQTT_THREAD_PRIORITY, NX_NULL, 0); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to create MQTT client (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + /* Register the disconnect notification function. */ + status = nxd_mqtt_client_disconnect_notify_set(&device.mqtt_client, _mqtt_disconnect_callback); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to create MQTT disconnect notification (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + /* Set the receive notify function. */ + status = nxd_mqtt_client_receive_notify_set(&device.mqtt_client, _mqtt_recieve_callback); + if (status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to set mqtt recv notification (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + NXD_ADDRESS server_ip; + server_ip.nxd_ip_version = 4; + server_ip.nxd_ip_address.v4 = ETH_MQTT_SERVER_IP; + /* Start the connection to the server. */ + status = nxd_mqtt_client_connect(&device.mqtt_client, &server_ip, ETH_MQTT_SERVER_PORT, + 1000, NX_TRUE, NX_WAIT_FOREVER); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to connect to MQTT client (Status: %d/%s).", status, nx_status_toString(status)); + } else { + } +#endif /* Mark device as initialized. */ device.is_initialized = true; - PRINTLN_INFO("Ran ethernet_init()."); + PRINTLN_INFO("Ran ethernet_init()"); return NX_SUCCESS; } /* Creates an ethernet message (i.e. returns an ethernet_message_t instance). */ +#if ETH_ENABLE_MANUAL_UDP_MULTICAST ethernet_message_t ethernet_create_message(uint8_t message_id, ethernet_node_t recipient_id, uint8_t *data, uint8_t data_length) { ethernet_message_t message = {0}; @@ -391,6 +492,37 @@ uint8_t ethernet_send_message(ethernet_message_t *message) { PRINTLN_INFO("Sent ethernet message (Recipient ID: %d, Message ID: %d, Message Contents: %d).", message->recipient_id, message->message_id, message->data); return U_SUCCESS; } +#endif + +#if ETH_ENABLE_MQTT +UINT ethernet_mqtt_publish(char *topic_name, UINT topic_size, char *message, UINT message_size) { + return nxd_mqtt_client_publish(&device.mqtt_client, topic_name, topic_size-1, message, message_size, NX_FALSE, 0, MS_TO_TICKS(100)); +} + +UINT ethernet_mqtt_reconnect(void) { + NXD_ADDRESS server_ip; + server_ip.nxd_ip_version = 4; + server_ip.nxd_ip_address.v4 = ETH_MQTT_SERVER_IP; + // TODO fix bug that breaks reconnection with 0x10007 + nxd_mqtt_client_delete(&device.mqtt_client); + + char client_id[8] = ""; + UINT client_id_size = sprintf(client_id, "FWD-%d", device.node_id); + + UINT status = nxd_mqtt_client_create(&device.mqtt_client, "MQTT client", + client_id, client_id_size, &device.ip, &device.packet_pool, + (VOID*)device.mqtt_thread_stack, sizeof(device.mqtt_thread_stack), + _MQTT_THREAD_PRIORITY, NX_NULL, 0); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to create MQTT client (Status: %d/%s).", status, nx_status_toString(status)); + return status; + } + + /* Start the connection to the server. */ + return nxd_mqtt_client_connect(&device.mqtt_client, &server_ip, ETH_MQTT_SERVER_PORT, + 1000, NX_TRUE, NX_WAIT_FOREVER); +} +#endif NX_PTP_DATE_TIME ethernet_get_time(void) { NX_PTP_TIME tm; @@ -404,4 +536,31 @@ NX_PTP_DATE_TIME ethernet_get_time(void) { return date; } +UINT ethernet_print_arp_status(void) { + ULONG arp_requests_sent = 100; + ULONG arp_requests_received = 100; + + ULONG arp_responses_sent = 100; + ULONG arp_responses_received; + ULONG arp_dynamic_entries= 100; + ULONG arp_static_entries= 100; + ULONG arp_aged_entries= 100; + ULONG arp_invalid_messages= 100; + UINT status = nx_arp_info_get(&device.ip, &arp_requests_sent, &arp_requests_received, + &arp_responses_sent, &arp_responses_received, + &arp_dynamic_entries, &arp_static_entries, + &arp_aged_entries, &arp_invalid_messages); + if(status != NX_SUCCESS) { + PRINTLN_ERROR("Failed to retrieve ARP info (Status: %d/%s)", status, nx_status_toString(status)); + return status; + } + PRINTLN_INFO("ARP info REQ SENT %lu, REQ RECV %lu, RESP SENT %lu, RESP RECV %lu, DYN ENTRY %lu, S ENTRY %lu, AGED %lu, INVALID %lu", arp_requests_sent, arp_requests_received, + arp_responses_sent, arp_responses_received, + arp_dynamic_entries, arp_static_entries, + arp_aged_entries, arp_invalid_messages); + + return status; +} + + // clang-format on diff --git a/NetX/src/u_nx_protobuf.c b/NetX/src/u_nx_protobuf.c new file mode 100644 index 00000000..0f901ac3 --- /dev/null +++ b/NetX/src/u_nx_protobuf.c @@ -0,0 +1,116 @@ +#include +#include "u_tx_debug.h" +#include "u_nx_protobuf.h" +#include "u_nx_ethernet.h" +#include "pb.h" +#include "pb_encode.h" +#include "tx_api.h" +#include "nxd_mqtt_client.h" + +ethernet_mqtt_message_t _nx_protobuf_mqtt_message_create(const char* topic, size_t topic_len, const char* unit, size_t unit_len, const float values[], int values_size) { + /* Zero-initialize the protobuf struct and the sendable ethernet_mqtt_message_t message. */ + serverdata_v2_ServerData protobuf = serverdata_v2_ServerData_init_zero; + ethernet_mqtt_message_t message = { 0 }; + message.initialized = false; + + /* Enforce topic length. */ + if(topic_len > PB_MAX_TOPIC_LENGTH) { + PRINTLN_ERROR("MQTT Message topic exceeds maximum length of %d (Topic: %s, Current Topic Length: %d).", PB_MAX_TOPIC_LENGTH, topic, topic_len); + return message; // Return empty, uninitialized message. + } + // NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason. + + /* Enforce unit length. */ + if(unit_len > PB_MAX_UNIT_LENGTH) { + PRINTLN_ERROR("MQTT Unit string length exceeds maximum length of %d (Topic: %s, Current Unit String Length: %d).", PB_MAX_UNIT_LENGTH, topic, unit_len); + return message; // Return empty, uninitialized message. + } + // NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason. + + /* Enforce minimum number of datapoints. */ + if(values_size < PB_MIN_DATAPOINTS) { + PRINTLN_ERROR("Message must have at least %d datapoints (Topic: %s, Current values_size: %d).", PB_MIN_DATAPOINTS, topic, values_size); + return message; // Return empty, uninitialized message. + } + // NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason. + + /* Enforce maximum number of datapoints. */ + if(values_size > PB_MAX_DATAPOINTS) { + PRINTLN_ERROR("Message cannot have more than %d datapoints (Topic: %s, Current values_size: %d).", PB_MAX_DATAPOINTS, topic, values_size); + return message; // Return empty, uninitialized message. + } + // NOTE: If using the `nx_protobuf_mqtt_message_create()` macro (as intended), the static asserts should catch this. This is just an extra check in case a caller uses this function directly for whatever reason. + + /* Get the PTP time and convert to appropriate protobuf time. */ + NX_PTP_DATE_TIME datetime = ethernet_get_time(); + uint64_t time_us = datetime.nanosecond; // u_TODO - this is temporary + // u_TODO - actually figure out how to convert this like it should be. Probably ask jack how this should be set up? + + /* Pack the protobuf message. */ + // CURRENT PROTOBUF SCHEMA LOOKS LIKE THIS: + // typedef struct _serverdata_v2_ServerData { + // char unit[15]; + // uint64_t time_us; + // pb_size_t values_count; + // float values[5]; + // } serverdata_v2_ServerData; + memcpy(protobuf.unit, unit, unit_len); + protobuf.time_us = time_us; + protobuf.values_count = values_size; + memcpy(protobuf.values, values, values_size * sizeof(float)); + + /* Pack the `ethernet_mqtt_message_t` object and return it as successfully initialized. */ + message.topic = topic; + message.topic_size = topic_len; + message.protobuf = protobuf; + message.initialized = true; + return message; +} + +int nx_protobuf_mqtt_message_send(ethernet_mqtt_message_t* message) { + /* Make sure message isn't nullptr. */ + if(!message) { + PRINTLN_ERROR("Null pointer to `ethernet_mqtt_message_t` message."); + return U_ERROR; + } + + /* Make sure message is initialized. */ + if(!message->initialized) { + PRINTLN_ERROR("Attempting to send an uninitialized `ethernet_mqtt_message_t` message."); + return U_ERROR; + } + + /* Set up the buffer. */ + unsigned char buffer[serverdata_v2_ServerData_size]; + pb_ostream_t stream = pb_ostream_from_buffer(buffer, sizeof(buffer)); + + /* Encode the protobuf. */ + int status = pb_encode(&stream, serverdata_v2_ServerData_fields, &message->protobuf); + if(status != true) { + PRINTLN_ERROR("Failed to serialize protobuf message (Topic: %s): %s", message->topic, PB_GET_ERROR(&stream)); + return U_ERROR; + } + + /* Publish over MQTT. */ + status = ethernet_mqtt_publish(message->topic, message->topic_size, (char*)buffer, stream.bytes_written); // u_TODO - ethernet_mqtt_publish should return U_SUCCESS/U_ERROR instead of the internal netx error macro + if(status != NXD_MQTT_SUCCESS) { + PRINTLN_WARNING("Failed to publish MQTT message (Topic: %s, Status: %d).", message->topic, status); + + /* If disconnected, attempt reconnection. */ + if(status == NXD_MQTT_NOT_CONNECTED) { + PRINTLN_WARNING("Detected disconnect from MQTT. Attempting reconnection..."); + do { + tx_thread_sleep(1000); + status = ethernet_mqtt_reconnect(); + PRINTLN_WARNING("Attempting MQTT reconnection (Status: %d).", status); + } while ((status != NXD_MQTT_SUCCESS) && (status != NXD_MQTT_ALREADY_CONNECTED)); + PRINTLN_WARNING("MQTT reconnection successful."); + } + + return U_ERROR; + } + + /* Return successful! */ + PRINTLN_INFO("Sent MQTT message (Topic: %s).", message->topic); + return U_SUCCESS; +} \ No newline at end of file diff --git a/dev/nanopb/CMakeLists.txt b/dev/nanopb/CMakeLists.txt new file mode 100644 index 00000000..7a531d71 --- /dev/null +++ b/dev/nanopb/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 3.22) + +set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/nanopb/extra) +find_package(Nanopb REQUIRED) + +nanopb_generate_cpp(TARGET proto serverdata.proto) + +target_link_libraries(${CMAKE_PROJECT_NAME} proto) diff --git a/dev/nanopb/nanopb b/dev/nanopb/nanopb new file mode 160000 index 00000000..c716db13 --- /dev/null +++ b/dev/nanopb/nanopb @@ -0,0 +1 @@ +Subproject commit c716db13070bfb7de03b33f5a6558528cbf8a249 diff --git a/dev/nanopb/serverdata.options b/dev/nanopb/serverdata.options new file mode 100644 index 00000000..77860c7c --- /dev/null +++ b/dev/nanopb/serverdata.options @@ -0,0 +1,2 @@ +serverdata.v2.ServerData.unit max_size: 15 +serverdata.v2.ServerData.values max_count: 5 diff --git a/dev/nanopb/serverdata.proto b/dev/nanopb/serverdata.proto new file mode 100644 index 00000000..3d74e6b0 --- /dev/null +++ b/dev/nanopb/serverdata.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package serverdata.v2; + +message ServerData { + // ensure old type is reserved + reserved 1; + reserved "value"; + + string unit = 2; + // time since unix epoch in MICROSECONDS + uint64 time_us = 3; + repeated float values = 4; +} diff --git a/patches/0322026_nx_stm32_query_status.patch b/patches/0322026_nx_stm32_query_status.patch new file mode 100644 index 00000000..d070f5d3 --- /dev/null +++ b/patches/0322026_nx_stm32_query_status.patch @@ -0,0 +1,30 @@ +diff --git a/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.c b/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.c +index d93161a..4e35bca 100644 +--- a/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.c ++++ b/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.c +@@ -265,6 +265,12 @@ NX_INTERFACE *interface_ptr; + } + #endif /* NX_ENABLE_INTERFACE_CAPABILITY */ + ++ case NX_DRIVER_GET_STATE: ++ { ++ *(driver_req_ptr->nx_ip_driver_return_ptr) = nx_driver_information.nx_driver_information_state; ++ break; ++ } ++ + default: + + +diff --git a/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.h b/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.h +index 0198ed9..a94fa14 100644 +--- a/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.h ++++ b/Middlewares/ST/netxduo/common/drivers/ethernet/nx_stm32_eth_driver.h +@@ -72,6 +72,8 @@ extern "C" { + #define NX_DRIVER_STATE_INITIALIZED 3 + #define NX_DRIVER_STATE_LINK_ENABLED 4 + ++#define NX_DRIVER_GET_STATE 51 ++ + #define NX_DRIVER_ERROR 90 + +