Skip to content

Commit e116ee4

Browse files
eeffflyfish30
authored andcommitted
Make default persist adapter load group configs
1 parent 5ccbf1b commit e116ee4

File tree

3 files changed

+121
-33
lines changed

3 files changed

+121
-33
lines changed

src/adapter/adapter.c

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,53 @@ static int persister_singleton_load_setting(neu_adapter_t *adapter,
260260
return rv;
261261
}
262262

263+
static int persister_singleton_load_grp_and_tags(neu_adapter_t *adapter,
264+
const char * adapter_name,
265+
neu_node_id_t node_id)
266+
{
267+
vector_t * group_config_infos = NULL;
268+
neu_persister_t *persister = persister_singleton_get();
269+
270+
int rv = neu_persister_load_group_configs(persister, adapter_name,
271+
&group_config_infos);
272+
if (0 != rv) {
273+
log_error("%s fail load grp config of %s", adapter->name, adapter_name);
274+
return rv;
275+
}
276+
277+
VECTOR_FOR_EACH(group_config_infos, iter)
278+
{
279+
neu_persist_group_config_info_t *p = iterator_get(&iter);
280+
281+
neu_taggrp_config_t *grp_config =
282+
neu_taggrp_cfg_new(p->group_config_name);
283+
if (NULL == grp_config) {
284+
rv = NEU_ERR_ENOMEM;
285+
break;
286+
}
287+
neu_taggrp_cfg_set_interval(grp_config, p->read_interval);
288+
289+
neu_cmd_add_grp_config_t cmd = {
290+
.node_id = node_id,
291+
.grp_config = grp_config,
292+
};
293+
294+
rv = neu_manager_add_grp_config(adapter->manager, &cmd);
295+
const char *ok_or_err = (0 == rv) ? "success" : "fail";
296+
log_info("%s load group config %s interval:%d", adapter->name,
297+
p->group_config_name, p->read_interval);
298+
if (0 != rv) {
299+
neu_taggrp_cfg_free(grp_config);
300+
break;
301+
}
302+
303+
// TODO: add tags
304+
}
305+
306+
neu_persist_group_config_infos_free(group_config_infos);
307+
return rv;
308+
}
309+
263310
static int persister_singleton_load_data(neu_adapter_t *adapter)
264311
{
265312
vector_t * adapter_infos = NULL;
@@ -303,6 +350,12 @@ static int persister_singleton_load_data(neu_adapter_t *adapter)
303350
goto error_add_adapters;
304351
}
305352

353+
rv = persister_singleton_load_grp_and_tags(adapter, adapter_info->name,
354+
node_id);
355+
if (0 != rv) {
356+
goto error_add_adapters;
357+
}
358+
306359
// TODO: start node according to state
307360
}
308361

src/persist/persist.c

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,43 @@ static int read_file_string(const char *fn, char **out)
410410
return NEU_ERR_FAILURE;
411411
}
412412

413+
// file tree walking callback for collecting adapter group config infos
414+
static int read_group_config_cb(const char *fpath, bool is_dir, void *arg)
415+
{
416+
vector_t *group_config_infos = arg;
417+
int rv = 0;
418+
419+
if (is_dir) {
420+
return 0;
421+
}
422+
423+
char *json_str = NULL;
424+
rv = read_file_string(fpath, &json_str);
425+
if (0 != rv) {
426+
return rv;
427+
}
428+
429+
neu_json_group_configs_req_t *group_config_req = NULL;
430+
rv = neu_json_decode_group_configs_req(json_str, &group_config_req);
431+
free(json_str);
432+
if (0 != rv) {
433+
return rv;
434+
}
435+
436+
log_info("read %s", fpath);
437+
438+
if (0 == vector_push_back(group_config_infos, group_config_req)) {
439+
// NOTE: do not call neu_json_decode_group_configs_req_free,
440+
// since member ownership was transferred to vector
441+
free(group_config_req);
442+
} else {
443+
neu_json_decode_group_configs_req_free(group_config_req);
444+
rv = NEU_ERR_ENOMEM;
445+
}
446+
447+
return rv;
448+
}
449+
413450
typedef struct neu_persister {
414451
const char *persist_dir;
415452
const char *adapters_fname;
@@ -841,48 +878,30 @@ int neu_persister_store_group_config(
841878

842879
int neu_persister_load_group_configs(neu_persister_t *persister,
843880
const char * adapter_name,
844-
const char * group_config_name,
845881
vector_t ** group_config_infos)
846882
{
847-
char *group_configs = NULL;
883+
char path[PATH_MAX_SIZE] = { 0 };
848884

849-
char group_config_file[128] = { 0 };
850-
int rv = snprintf(group_config_file, 128, "%s/persist/%s/group_configs/%s",
851-
persister->persist_dir, adapter_name, group_config_name);
852-
if (sizeof(group_config_file) == rv) {
853-
log_error("group_config_file exceeds maximum value");
885+
int n = persister_group_configs_dir(path, sizeof(path), persister,
886+
adapter_name);
887+
if (sizeof(path) == n) {
888+
log_error("persister path too long: %s", path);
854889
return -1;
855890
}
856891

857-
rv = read_file_string(group_config_file, &group_configs);
858-
if (rv != 0) {
859-
return rv;
892+
vector_t *result = vector_new(0, sizeof(neu_persist_group_config_info_t));
893+
if (NULL == result) {
894+
return NEU_ERR_ENOMEM;
860895
}
861896

862-
neu_json_group_configs_req_t *group_config_req = NULL;
863-
rv = neu_json_decode_group_configs_req(group_configs, &group_config_req);
864-
if (rv != 0) {
865-
return rv;
897+
int rv = file_tree_walk(path, read_group_config_cb, result);
898+
if (0 == rv) {
899+
*group_config_infos = result;
900+
} else {
901+
neu_persist_group_config_infos_free(result);
866902
}
867903

868-
// vector_t *vec = vector_new_move_from_buf(
869-
// group_config_req->datatag_names, group_config_req->datatag_names,
870-
// group_config_req->group_config_name,
871-
// group_config_req->n_datatag_name, group_config_req->read_interval,
872-
// sizeof(neu_persist_group_config_info_t));
873-
// if (vec == NULL) {
874-
// return -1;
875-
// }
876-
877-
// *group_config_infos = vec;
878-
// group_config_req->adapter_name = NULL;
879-
// group_config_req->datatag_names = NULL;
880-
// group_config_req->group_config_name = NULL;
881-
// group_config_req->n_datatag_name = 0;
882-
// group_config_req->read_interval = 0;
883-
884-
neu_json_decode_group_configs_req_free(group_config_req);
885-
return 0;
904+
return rv;
886905
}
887906

888907
int neu_persister_delete_group_config(neu_persister_t *persister,

src/persist/persist.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,23 @@ static inline void neu_persist_plugin_infos_free(vector_t *plugin_infos)
6161
vector_free(plugin_infos);
6262
}
6363

64+
static inline void
65+
neu_persist_group_config_infos_free(vector_t *group_config_infos)
66+
{
67+
VECTOR_FOR_EACH(group_config_infos, iter)
68+
{
69+
neu_persist_group_config_info_t *p = iterator_get(&iter);
70+
char ** tag_names = p->datatag_names;
71+
for (int i = 0; i < p->n_datatag_name; ++i) {
72+
free(tag_names[i]);
73+
}
74+
free(tag_names);
75+
free(p->group_config_name);
76+
free(p->adapter_name);
77+
}
78+
vector_free(group_config_infos);
79+
}
80+
6481
/**
6582
* Persister, provide methods to persist data */
6683
typedef struct neu_persister neu_persister_t;
@@ -188,7 +205,6 @@ int neu_persister_store_group_config(
188205
*/
189206
int neu_persister_load_group_configs(neu_persister_t *persister,
190207
const char * adapter_name,
191-
const char * group_config_name,
192208
vector_t ** group_config_infos);
193209
/**
194210
* Delete group config.

0 commit comments

Comments
 (0)