Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions include/neuron/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,49 @@ static inline int neu_req_add_tag_copy(neu_req_add_tag_t *dst,
typedef struct {
uint16_t index;
int error;
} neu_resp_tag_op_result_t;

inline static UT_icd *neu_resp_tag_op_result_icd()
{
static UT_icd icd = { sizeof(neu_resp_tag_op_result_t), NULL, NULL, NULL };
return &icd;
}

typedef struct {
uint16_t index;
int error;
UT_array *results; // array of neu_resp_tag_op_result_t
} neu_resp_add_tag_t, neu_resp_update_tag_t;

static inline int neu_resp_add_tag_result(neu_resp_add_tag_t *resp,
uint16_t index, int error)
{
neu_resp_tag_op_result_t result = {
.index = index,
.error = error,
};

if (resp->results == NULL) {
utarray_new(resp->results, neu_resp_tag_op_result_icd());
if (resp->results == NULL) {
return -1;
}
}

utarray_push_back(resp->results, &result);
resp->index = index;
resp->error = error;
return 0;
}

static inline void neu_resp_add_tag_result_fini(neu_resp_add_tag_t *resp)
{
if (resp->results != NULL) {
utarray_free(resp->results);
resp->results = NULL;
}
}

typedef struct neu_req_del_tag {
char driver[NEU_NODE_NAME_LEN];
char group[NEU_GROUP_NAME_LEN];
Expand Down
145 changes: 80 additions & 65 deletions plugins/restful/datatag_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,78 +49,78 @@ void handle_add_tags(nng_aio *aio)
for (int i = 0; i < req->n_tag; i++) {
for (int k = 0; k < i; k++) {
if (strcmp(req->tags[i].name, req->tags[k].name) == 0) {
add_resp.index = i;
add_resp.error = NEU_ERR_TAG_NAME_CONFLICT;
neu_resp_add_tag_result(&add_resp, i,
NEU_ERR_TAG_NAME_CONFLICT);
break;
}
}
}
if (add_resp.error > 0) {

for (int i = 0; i < req->n_tag; i++) {
if (req->tags[i].type == NEU_TYPE_STRING ||
req->tags[i].type == NEU_TYPE_BOOL ||
req->tags[i].type == NEU_TYPE_BIT ||
req->tags[i].type == NEU_TYPE_TIME ||
req->tags[i].type == NEU_TYPE_DATA_AND_TIME ||
req->tags[i].type == NEU_TYPE_CUSTOM) {
if (req->tags[i].precision > 0) {
neu_resp_add_tag_result(
&add_resp, i, NEU_ERR_TAG_PRECISION_INVALID);
}
if (req->tags[i].decimal > 0) {
neu_resp_add_tag_result(
&add_resp, i, NEU_ERR_TAG_DECIMAL_INVALID);
}
if (req->tags[i].bias > 0) {
neu_resp_add_tag_result(&add_resp, i,
NEU_ERR_TAG_BIAS_INVALID);
}
}
}

if (add_resp.results != NULL &&
utarray_len(add_resp.results) > 0) {
neu_resp_tag_op_result_t *first =
(neu_resp_tag_op_result_t *) utarray_eltptr(
add_resp.results, 0);
add_resp.index = first->index;
add_resp.error = first->error;
handle_add_tags_resp(aio, &add_resp);
} else {
header.ctx = aio;
header.type = NEU_REQ_ADD_TAG;
strncpy(cmd.driver, req->node, NEU_NODE_NAME_LEN - 1);
strncpy(cmd.group, req->group, NEU_GROUP_NAME_LEN - 1);
cmd.n_tag = req->n_tag;
cmd.tags = calloc(req->n_tag, sizeof(neu_datatag_t));

for (int i = 0; i < req->n_tag; i++) {
if (req->tags[i].type == NEU_TYPE_STRING ||
req->tags[i].type == NEU_TYPE_BOOL ||
req->tags[i].type == NEU_TYPE_BIT ||
req->tags[i].type == NEU_TYPE_TIME ||
req->tags[i].type == NEU_TYPE_DATA_AND_TIME ||
req->tags[i].type == NEU_TYPE_CUSTOM) {
if (req->tags[i].precision > 0) {
add_resp.index = i;
add_resp.error = NEU_ERR_TAG_PRECISION_INVALID;
break;
}
if (req->tags[i].decimal > 0) {
add_resp.index = i;
add_resp.error = NEU_ERR_TAG_DECIMAL_INVALID;
break;
}
if (req->tags[i].bias > 0) {
add_resp.index = i;
add_resp.error = NEU_ERR_TAG_BIAS_INVALID;
break;
}
cmd.tags[i].attribute = req->tags[i].attribute;
cmd.tags[i].type = req->tags[i].type;
cmd.tags[i].precision = req->tags[i].precision;
cmd.tags[i].decimal = req->tags[i].decimal;
cmd.tags[i].bias = req->tags[i].bias;
cmd.tags[i].address = strdup(req->tags[i].address);
cmd.tags[i].name = strdup(req->tags[i].name);
if (req->tags[i].description != NULL) {
cmd.tags[i].description =
strdup(req->tags[i].description);
} else {
cmd.tags[i].description = strdup("");
}
}

if (add_resp.error > 0) {
handle_add_tags_resp(aio, &add_resp);
} else {
header.ctx = aio;
header.type = NEU_REQ_ADD_TAG;
strncpy(cmd.driver, req->node, NEU_NODE_NAME_LEN - 1);
strncpy(cmd.group, req->group, NEU_GROUP_NAME_LEN - 1);
cmd.n_tag = req->n_tag;
cmd.tags = calloc(req->n_tag, sizeof(neu_datatag_t));

for (int i = 0; i < req->n_tag; i++) {
cmd.tags[i].attribute = req->tags[i].attribute;
cmd.tags[i].type = req->tags[i].type;
cmd.tags[i].precision = req->tags[i].precision;
cmd.tags[i].decimal = req->tags[i].decimal;
cmd.tags[i].bias = req->tags[i].bias;
cmd.tags[i].address = strdup(req->tags[i].address);
cmd.tags[i].name = strdup(req->tags[i].name);
if (req->tags[i].description != NULL) {
cmd.tags[i].description =
strdup(req->tags[i].description);
} else {
cmd.tags[i].description = strdup("");
}
if (req->tags[i].unit == NULL) {
cmd.tags[i].unit = strdup("");
} else {
cmd.tags[i].unit = strdup(req->tags[i].unit);
}
if (req->tags[i].unit == NULL) {
cmd.tags[i].unit = strdup("");
} else {
cmd.tags[i].unit = strdup(req->tags[i].unit);
}
}

int ret = neu_plugin_op(plugin, header, &cmd);
if (ret != 0) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_IS_BUSY, {
neu_http_response(aio, NEU_ERR_IS_BUSY,
result_error);
});
}
int ret = neu_plugin_op(plugin, header, &cmd);
if (ret != 0) {
NEU_JSON_RESPONSE_ERROR(NEU_ERR_IS_BUSY, {
neu_http_response(aio, NEU_ERR_IS_BUSY,
result_error);
});
}
}
}
Expand All @@ -129,16 +129,29 @@ void handle_add_tags(nng_aio *aio)

void handle_add_tags_resp(nng_aio *aio, neu_resp_add_tag_t *resp)
{
neu_json_add_tag_res_t res = { 0 };
char * result = NULL;
neu_json_add_tag_res_t res = { 0 };
neu_json_tag_op_res_item_t fallback_result;
char * result = NULL;

res.error = resp->error;
res.index = resp->index;
res.error = resp->error;

if (resp->results != NULL && utarray_len(resp->results) > 0) {
res.n_result = utarray_len(resp->results);
res.results =
(neu_json_tag_op_res_item_t *) utarray_front(resp->results);
} else if (resp->error != 0) {
fallback_result.index = resp->index;
fallback_result.error = resp->error;
res.n_result = 1;
res.results = &fallback_result;
}

neu_json_encode_by_fn(&res, neu_json_encode_au_tags_resp, &result);

NEU_JSON_RESPONSE_ERROR(resp->error,
{ neu_http_response(aio, resp->error, result); });
neu_resp_add_tag_result_fini(resp);
free(result);
}

Expand Down Expand Up @@ -270,6 +283,7 @@ void handle_add_gtags_resp(nng_aio *aio, neu_resp_add_tag_t *resp)
neu_json_encode_by_fn(&res, neu_json_encode_au_gtags_resp, &result);
NEU_JSON_RESPONSE_ERROR(resp->error,
{ neu_http_response(aio, resp->error, result); });
neu_resp_add_tag_result_fini(resp);
free(result);
}

Expand Down Expand Up @@ -315,6 +329,7 @@ void handle_import_tags_resp(nng_aio *aio, neu_resp_add_tag_t *resp)
neu_json_encode_by_fn(&res, neu_json_encode_import_tags_resp, &result);
NEU_JSON_RESPONSE_ERROR(resp->error,
{ neu_http_response(aio, resp->error, result); });
neu_resp_add_tag_result_fini(resp);
free(result);
}

Expand Down
39 changes: 29 additions & 10 deletions src/adapter/adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -1569,25 +1569,31 @@ static int adapter_loop(enum neu_event_io_type type, int fd, void *usr_data)
break;
}
case NEU_REQ_ADD_TAG: {
neu_req_add_tag_t *cmd = (neu_req_add_tag_t *) &header[1];
neu_resp_add_tag_t resp = { 0 };
neu_req_add_tag_t *cmd = (neu_req_add_tag_t *) &header[1];
neu_resp_add_tag_t resp = { 0 };
uint16_t added_count = 0;

if (adapter->module->type == NEU_NA_TYPE_DRIVER) {
for (int i = 0; i < cmd->n_tag; i++) {
int ret = neu_adapter_driver_validate_tag(
(neu_adapter_driver_t *) adapter, cmd->group,
&cmd->tags[i]);
if (ret == 0) {
resp.index += 1;
added_count += 1;
} else {
resp.error = ret;
neu_resp_add_tag_result(&resp, i, ret);
}
}
} else {
resp.error = NEU_ERR_GROUP_NOT_ALLOW;
neu_resp_add_tag_result(&resp, 0, NEU_ERR_GROUP_NOT_ALLOW);
}

if (resp.error != 0) {
resp.index = added_count;
if (resp.results != NULL && utarray_len(resp.results) > 0) {
neu_resp_tag_op_result_t *first =
(neu_resp_tag_op_result_t *) utarray_eltptr(resp.results, 0);
resp.error = first->error;
for (uint16_t i = 0; i < cmd->n_tag; i++) {
neu_tag_fini(&cmd->tags[i]);
}
Expand All @@ -1606,8 +1612,9 @@ static int adapter_loop(enum neu_event_io_type type, int fd, void *usr_data)
neu_tag_fini(&cmd->tags[i]);
}
neu_msg_exchange(header);
resp.index = resp.error - 1;
resp.error = NEU_ERR_TAG_NAME_CONFLICT;
resp.index = resp.error - 1;
resp.error = NEU_ERR_TAG_NAME_CONFLICT;
neu_resp_add_tag_result(&resp, resp.index, resp.error);
header->type = NEU_RESP_ADD_TAG;
reply(adapter, header, &resp);
free(cmd->tags);
Expand All @@ -1622,26 +1629,32 @@ static int adapter_loop(enum neu_event_io_type type, int fd, void *usr_data)
neu_tag_fini(&cmd->tags[i]);
}
resp.index = 0;
neu_resp_add_tag_result(&resp, 0, resp.error);
neu_msg_exchange(header);
header->type = NEU_RESP_ADD_TAG;
reply(adapter, header, &resp);
free(cmd->tags);
break;
}

added_count = 0;
for (int i = 0; i < cmd->n_tag; i++) {
int ret = neu_adapter_driver_add_tag(
(neu_adapter_driver_t *) adapter, cmd->group, &cmd->tags[i],
NEU_DEFAULT_GROUP_INTERVAL);
if (ret != 0) {
neu_adapter_driver_try_del_tag((neu_adapter_driver_t *) adapter,
cmd->n_tag - i);
resp.index = i;
resp.error = ret;
added_count = i;
resp.error = ret;
neu_resp_add_tag_result(&resp, i, ret);
break;
}
added_count += 1;
}

resp.index = added_count;

for (uint16_t i = resp.index; i < cmd->n_tag; i++) {
neu_tag_fini(&cmd->tags[i]);
}
Expand Down Expand Up @@ -1762,6 +1775,7 @@ static int adapter_loop(enum neu_event_io_type type, int fd, void *usr_data)
case NEU_REQ_UPDATE_TAG: {
neu_req_update_tag_t *cmd = (neu_req_update_tag_t *) &header[1];
neu_resp_update_tag_t resp = { 0 };
uint16_t updated_count = 0;

if (adapter->module->type == NEU_NA_TYPE_DRIVER) {

Expand All @@ -1777,20 +1791,25 @@ static int adapter_loop(enum neu_event_io_type type, int fd, void *usr_data)
adapter_storage_update_tag(cmd->driver, cmd->group,
&cmd->tags[i]);

resp.index += 1;
updated_count += 1;
} else {
resp.error = ret;
neu_resp_add_tag_result(&resp, i, ret);
break;
}
} else {
resp.error = ret;
neu_resp_add_tag_result(&resp, i, ret);
break;
}
}
} else {
resp.error = NEU_ERR_GROUP_NOT_ALLOW;
neu_resp_add_tag_result(&resp, 0, NEU_ERR_GROUP_NOT_ALLOW);
}

resp.index = updated_count;

for (uint16_t i = resp.index; i < cmd->n_tag; i++) {
neu_tag_fini(&cmd->tags[i]);
}
Expand Down
Loading
Loading