Collectd 源码解析

2016-08-26 Friday     linux , misc

接下来介绍下 Collectd 中源码的实现。

简介

在编译源码前,需要先安装部分依赖库。

# yum install libtool-ltdl-devel flex bison

接着介绍下源码编译时常用的命令。

----- 如果修改了configure.ac
$ autoreconf -ivf

----- 修改Makefile.am
$ automake --add-missing --copy

----- 编译测试##默认关闭所有插件,指定需要编译的插件
$ mkdir build && cd build
$ ../configure --enable-debug --enable-all-plugins=no \
    --enable-logfile --enable-unixsock --enable-write-log \
    --enable-cpu --enable-load --enable-contextswitch \
    --enable-memory
$ make
$ make distclean

----- 单元测试##
$ make check

----- 压测工具,会随机生成host、plugin、values,尽量模拟正常的采集流量
$ collectd-tg

----- 源码发布打包
$ make distcheck

与测试相关的宏定义在 testing.h 文件中,执行 make check 需要定义 TESTS 变量,可以指定多个,如果返回非零则表示失败,详细可以查看 Support for test suites

对于 man 文档,通过 man_MANS 变量定义,然后会根据后缀名自动安装到相应的系统目录。

打包时可以通过 EXTRA_DIST 变量指定比较特殊的一些文件。

常用概念

在介绍源码的实现前,首先看下一些常见的概念。

插件采集的数据保存在 value_list_t 中,这个对应了 types.db 中的一行记录,types.db 中一行可能会包含了多个采集结构,例如 load 指标,那么与之对应 value_list_t 也可能会包含了多个采集值。

标示符 (Identifier)

value_list_t 中还包含了用于构建标示符 (Identifier) 的值,包括了: host、plugin、plugin instance (optional)、type、type instance (optional) 五部分,其中两个是可选的,其含义如下:

  • host 主机名称,可以再插件中设置,否则直接获取配置文件中的主机名;
  • plugin 一般是插件名称;
  • plugin instance 可选,例如 CPU 在多核的场景;
  • type 很多还是和插件相同,感觉有些冲突;
  • type instance 可选,监控的不同类型,例如 CPU 的指标有 idle、usage 等。

其中每项命名的最大值为 64,建议不要使用 '/''-',格式化时通过 format_name() 实现,串行化的格式为:host "/" plugin ["-" plugin instance] "/" type ["-" type instance]

如下,是几个常见的示例:

localhost/cpu-0/cpu-idle
localhost/memory/memory-used
wanda/disk-hdc1/disk_octets
leeloo/load/load

常见的在 global cache 中会使用,用于保存最近保存的一次数据,与 notification 相关的参数。

个人建议使用 Plugin + Plugin Instance + types.db 这种方式定义 Identifier 。

采集值类型

Collectd 中总共有四种类型,简单介绍如下。

  • GAUSE: 直接使用采样值,通常为温度、内存使用率等参数值;
  • DERIVE: 计算采样的速率,计算公式为 rate=(value_new-value_old)/(time_new-time_old) ,需要注意,如果值非递增,那么可能产生负值;
  • COUNTER: 与 DERIVE 相同,只是当 value_new<value_old 时则认为是 “wrapped around”
  • ABSOULUTE: 与 GAUSE 比较相似,只是该值是无符号整数,且会除以当前时间与上次采样时间的差值;

需要注意的是,COUNTER 值采集的三种特殊情况:1) 计数器达到最大值,导致回环;2) 监控服务维护(重启或者手动计数器清零);3) 监控 agent 重启,导致上次计数丢失。这三种情况可能会导致采集指标异常,通常只有一个采集点,一般可以过滤或者忽略掉。

源码实现

相关的代码内容如下。

#define DS_TYPE_COUNTER 0
#define DS_TYPE_GAUGE 1
#define DS_TYPE_DERIVE 2
#define DS_TYPE_ABSOLUTE 3

typedef unsigned long long counter_t;
typedef double gauge_t;
typedef int64_t derive_t;
typedef uint64_t absolute_t;

union value_u {
  counter_t counter;
  gauge_t gauge;
  derive_t derive;
  absolute_t absolute;
};
typedef union value_u value_t;

int value_to_rate(gauge_t *ret_rate,
                  value_t value, int ds_type, cdtime_t t,
                  value_to_rate_state_t *state) {
  gauge_t interval;

  /* Another invalid state: The time is not increasing. */
  if (t <= state->last_time) {
    memset(state, 0, sizeof(*state));
    return (EINVAL);
  }

  interval = CDTIME_T_TO_DOUBLE(t - state->last_time);

  /* Previous value is invalid. */
  if (state->last_time == 0) {
    state->last_value = value;
    state->last_time = t;
    return (EAGAIN);
  }

  switch (ds_type) {
  case DS_TYPE_DERIVE: {
    derive_t diff = value.derive - state->last_value.derive;
    *ret_rate = ((gauge_t)diff) / ((gauge_t)interval);
    break;
  }
  case DS_TYPE_GAUGE: {
    *ret_rate = value.gauge;
    break;
  }
  case DS_TYPE_COUNTER: {
    counter_t diff = counter_diff(state->last_value.counter, value.counter);
    *ret_rate = ((gauge_t)diff) / ((gauge_t)interval);
    break;
  }
  case DS_TYPE_ABSOLUTE: {
    absolute_t diff = value.absolute;
    *ret_rate = ((gauge_t)diff) / ((gauge_t)interval);
    break;
  }
  default:
    return EINVAL;
  }

  state->last_value = value;
  state->last_time = t;
  return (0);
}

插件实现

首先,简单说下插件是如何加载的。

有个配置项 AutoLoadPlugin false 用于控制是否会自动加载插件,默认需要先通过 LoadPlugin 指令加载插件,然后再在 <Plugin XXX> 中进行配置,否则会报错;当然也可以将该参数修改为 true ,然后在遇到 <Plugin XXX> 时自动加载。

源码中有一个示例插件,可查看 contrib/examples 目录下的文件;当然,也可以参考 官方文档

插件的实现实际上主要是通过将回调函数注册到链表上。

int plugin_register_init()<<<1>>>
注册初始化函数,正式创建读写线程之前执行。
使用链表: list_init

int plugin_register_config()<<<2>>>
简单的配置处理,只含有配置项以及回调函数。
使用链表: first_callback

int plugin_register_complex_config()<<<2>>>
通过一个注册的回调函数进行配置解析,在链表末尾写入;该函数会对配置项进行检测,包括了一些复杂的逻辑处理。
使用链表: complex_callback_head

int plugin_register_flush()<<<3>>>
有部分插件会缓存数据,例如network、rrdtool等,通过函数注册到链表,在执行 flush 命令时会调用链表上的各个函数。
使用链表: list_flush,通过plugin_flush()函数调用

int plugin_register_read()<<<3>>>
int plugin_register_complex_read()
两个函数的区别在于回调函数中是否需要传入参数,详见plugin_read_thread()中的调用。
使用链表:read_list

int plugin_register_write()<<<3>>>
写入插件的回调函数注册。
使用链表:list_write

int plugin_register_missing()<<<3>>>
会在每次的时间间隔检查是否有数据丢失,当发现有数据丢失时会调用注册的函数。
使用链表:list_missing

int plugin_register_shutdown()<<<4>>>
通过plugin_shutdown_all()函数调用。
使用链表:list_shutdown

新增插件

首先,新增 autoconf 的配置。

需要通过自定义的宏 AC_PLUGIN() 添加插件编译,其中第一个参数为 --enable-XXX 使用,第二个表示是否需要编译 (例如是否依赖的库存在,如果出了C99/POSIX不依赖其它则设置为 “yes” 即可),第三个参数为注释,用于 ./configure --help

如下是简单的示例。

AC_PLUGIN([name],                $with_dependency,          [description])
AC_PLUGIN([xencpu],              [$plugin_xencpu],          [Xen Host CPU usage])
AC_PLUGIN([xmms],                [$with_libxmms],           [XMMS statistics])
AC_PLUGIN([zookeeper],           [yes],                     [Zookeeper statistics])

添加上述的宏之后,会增加如下功能:

  • 使用 configure 命令配置时,可以通过 --enable-XXX --disable-XXX 或者 --enable-XXX=force 参数配置是否需要编译
  • 在 Makefile.am 中可以通过 if BUILD_PLUGIN_XXX 判断是否需要编译生成动态库;
  • 最终生成的配置头文件 src/config.h 中含有 HAVE_PLUGIN_XXX 定义宏。

接着增加 automake 配置。

在如上的 AC_PLUGIN() 宏中,会通过 AM_CONDITIONAL() 定义一个 BUILD_PLUGIN_XXX,不同的目录下都有相应的 Makefile.am 文件;例如,要增加一个插件,可以使用类似如下配置。

if BUILD_PLUGIN_FOOBAR
    pkglib_LTLIBRARIES += foobar.la
    foobar_la_SOURCES = foobar.c
    foobar_la_LDFLAGS = -module -avoid-version
    collectd_LDADD += "-dlopen" foobar.la
    collectd_DEPENDENCIES += foobar.la
endif

然后就可以通过 autoreconf + automake 重新生成。

源码详解

介绍下调用流程。

内存管理

这里以 load 插件为例。

  1. 通过 plugin_register_read() 函数注册到链表中,调用回调函数采集指标;
  2. 回调函数,load 通过 VALUE_LIST_INIT 宏初始化,也就是初始化一个本地的临时变量,然后通过 plugin_dispatch_values() 函数发送给 collectd 核心;
  3. plugin_write_enqueue() 函数中主要的内存分配函数,包括了 malloc(sizeof(write_queue_t)) + malloc(sizeof(value_list_t)) + calloc(len,sizeof(values)) + calloc(1,sizeof(meta_data_t)),如果中间有内存分配失败则释放;
  4. 而写线程插件会阻塞在 plugin_write_dequeue() 中,该函数会释放 sfree(write_queue_t) ,然后返回 value_list_t 指针;
  5. 线程的主循环 plugin_write_thread() 函数中,在执行完分发后会调用 plugin_value_list_free() 释放之前申请的主要内存。

calloc() 会将申请的缓存初始化为 0 ,而 malloc() 不会初始化内存。

调用流程

首先看下整个软件调用流程。

main()
 | <<<<<<<<<========= 读取配置文件
 |-plugin_init_ctx()                            <ctx>
 |-cf_read()                                  ← 读取配置文件
 | |-cf_read_generic()                        ← 判断文件是否存在
 | | |-wordexp()                              ← 文件扩展,确保文件存在
 | | |-stat()                                 ← 判断是文件夹还是文件
 | | |-cf_read_file()                         ← 读取配置文件内容
 | | | |-oconfig_parse_file()                 ← 解析配置项
 | | | | |-fopen()
 | | | | |-oconfig_parse_fh()
 | | | | | |-yyset_in()                       ← 入参是FILE*类型
 | | | | | |-yyparse()                        ← 使用LEX+YACC进行词法+语法解析
 | | | | | |-yyset_in()                       ← 处理完成,修改为NULL值
 | | | | |-fclose()
 | | | |-cf_include_all()                     ← 在解析完上述配置文件后查看是否有Include选项
 | | |-cf_read_dir()                          ← 如果是目录,会递归调用cf_read_generic()
 | | |-cf_ci_append_children()                ← 添加到root中
 | |
 | |-dispatch_value()                         ← 没有子配置项,一般为全局配置
 | | |-dispatch_global_option()               ← 全局配置,也就是cf_global_options变量中的定义
 | | | |-global_option_set()                  ← 设置全局变量
 | | |-dispatch_value_typesdb()               ← 对于cf_value_map中的定义,例如Typesdb,解析types.db
 | |   |-read_types_list()                    ← 读取一个配置文件,可以通过TypesDB指定多个配置文件
 | |     |-fopen()
 | |     |-parse_file()
 | |     | |-fgets()
 | |     | |-parse_line()
 | |     |   |-plugin_register_data_set()     ← 注册data_set_t类型,通过avl保存在data_sets全局变量中
 | |     |-fclose()
 | |
 | |-dispatch_block()                         ← 有子配置项时,也就是配置块
 | | |-dispatch_loadplugin()                  ← LoadPlugin,会调用plugin_load()
 | | | |-plugin_set_ctx()                    <ctx>获取插件上下文
 | | | |-plugin_load()
 | | |   |-strcasecmp()                       ← 对于python、perl插件,需要做部分初始化操作
 | | |   |-plugin_load_file()
 | | |     |-lt_dlsym()                       ← 调用各个插件的module_register()函数
 | | |       |-plugin_register_complex_read() ← 会生成read_func_t对象
 | | |         |-plugin_insert_read()         ← 写入到list以及heap
 | | | |-plugin_set_ctx()                    获取插件上下文
 | | |
 | | |-dispatch_block_plugin()                ← Plugin,会调用plugin_load()
 | | | |-plugin_load()                        ← 需要配置AutoLoadPlugin(true)参数
 | | | | ### 调用complex_callback_head链表
 | | | |-dispatch_value_plugin()
 | | |   |-cf_dispatch()
 | | |     |-cf_search()                      ← 查找first_callback链表中的回调函数
 | | |
 | | |-fc_configure()                         ← Chain,模块
 | |   |-fc_init_once()
 | |   |-fc_config_add_chain()
 | |-oconfig_free()                           ← 释放配置项
 | | |-oconfig_free_all()                     ← 释放配置,会递归调用
 | |-read_types_list()                        ← 如果配置文件中没有指定types.db,则加载默认
 |
 | <<<<<<<<<========= 系统初始化
 |-change_basedir()                           ← 如果设置了basedir,则尝试切换
 | |-chdir()                                  ← 切换到指定目录下
 |-init_global_variables()                    ← 初始化全局变量,如interval_g、timeout_g、hostname
 | |-cf_get_default_interval()                ← 设置全局的时间间隔
 | |-init_hostname()                          ← 获取主机名
 | | |-global_option_get()                    ← 尝试从配置文件中通过Hostname选项获取主机名
 | | |-gethostname()                          ← 没有配置则尝试通过该系统调用获取
 | | |-global_option_get()                    ← 通过FQDNLookup确认是否需要查找主机名
 | | |-getaddrinfo()                          ← 如果上述配置为true
 | |-return()                                 ← 如果使用了-t参数
 | |-fork()                                   ← daemonize
 |-sigaction()                                ← 注册信号处理函数,包括了SIGINT、SIGTERM、SIGUSR1
 |
 | <<<<<<<<<========= 创建工作线程
 |-do_init()
 | |-plugin_init_all()                        ← 会设置缓存队列的大小
 |   |-uc_init()                              ← 初始化全局变量cache_tree(平衡二叉树)
 |   | ###BEGIN读取所需要的配置参数
 |   |-plugin_register_read()                 ← 如果已经配置CollectInternalStats,则注册一个读取插件
 |   |-fc_chain_get_by_name()                 ← 读取PreCacheChain、PostCacheChain配置参数
 |   |
 |   |-cf_callback()                          ← <<<1>>>调用list_init链表中回调函数,检测插件是否正常
 |   |
 |   |-start_write_threads()                  ← 开启写线程
 |   | |-plugin_write_thread()                ← 启动写线程,真正的线程执行函数
 |   | | | ###BEGIN###while循环调用,判断write_loop是否可用
 |   | | |-plugin_write_dequeue()             ← 从write_queue链表的首部获取一个对象,队列递减
 |   | | | |-pthread_cond_wait()              ← 等待write_cond条件变量,使用write_lock锁
 |   | | |-plugin_dispatch_values_internal()  ← 入参是value_list_t类型
 |   | | | |-c_avl_get()                      ← 从data_sets中获取对应的数据类型,也就是types.db中
 |   | | | |-escape_slashes()                 ← 处理host、plugin、type中可能的转移字符
 |   | | | |-fc_process_chain()               ← 如果pre_cache_chain不为空,则调用该函数
 |   | | | |-uc_update()                      ← 更新缓存cache_tree,入参为data_set_t和value_list_t
 |   | | | | |-FORMAT_VL()                    ← 实际上是调用format_name()将vl中的值生成标示符
 |   | | | | |-c_avl_get()                    ← 利用上述标示符获取cache_entry_t,在此会缓存最近的一次采集数据
 |   | | | | |... ...                         ← 会根据不同的类型进行处理,例如DS_TYPE_GAUGE
 |   | | | | |-uc_check_range()               ← 检查是否在指定的范围内
 |   | | | |-fc_process_chain()  ##OR1##      ← 如果有post_cache_chain则调用
 |   | | | |-fc_default_action() ##OR1##      ← 调用写入插件写入
 |   | | |   |-fc_bit_write_invoke()
 |   | | |     |-plugin_write()               ← 通过list_write链表调用各个组件写入
 |   | | |       |                            ← 当所有插件都写入失败时返回-1,否则返回0
 |   | | |       |-cf_callback()
 |   | | | ###END###while
 |   | | |-plugin_value_list_free()           ← TODODO:是否使用缓存池
 |   | |-set_thread_name()                    ← 设置线程名称writerN
 |   |
 |   |-start_read_threads()                   ← 创建多个读线程,需要保证read_heap!=NULL
 |     |-plugin_read_thread()                 ← 线程实际入口,在一个死循环中执行,详细见后面的解析
 |     | | ###BEGIN###while循环调用,判断read_loop是否可用
 |     | |-pthread_cond_wait()                ← 等待read_cond条件变量,使用read_lock锁
 |     | |-c_heap_get_root()
 |     | | ###END###
 |     |-set_thread_name()                    ← 设置线程名称writerN
 |
 |-test_readall()                             ← 如果使用参数-T,则调用插件读取一行
 | |-plugin_read_all_once()                   ← 调用各个插件的读取函数,测试是否有误
 |   |-c_heap_get_root()
 |   |-rf_callback()                          ← 调用各个插件的读取函数
 |
 |-do_loop()                                  ← 在此包括了时间间隔
 | |-cf_get_default_interval()                ← 获取默认的时间间隔
 | |-cdtime()
 | | ###BEGIN###while循环调用,判断loop是否可用
 | |-plugin_read_all()
 |   |-uc_check_timeout()                     ← 主线程检查是否有插件超时
 |     |-plugin_dispatch_missing()            ← 如果有超时,则调用注册的回调函数
 |
 |-do_shutdown()                              ← 通过INT、TERM信号用于关闭
   |-plugin_shutdown_all()                    ← 调用链表上回调函数
     |-destroy_all_callbacks()

----- 注册简单读取插件
plugin_register_read()                        ← 新建read_func_t对象
 |-plugin_insert_read()
   |-llist_create()                           ← 如果第一次调用,则生成read_list
   |-c_heap_create()                          ← 如果第一次调用,则生成read_heap
   |-llist_search()                           ← 是否重复注册,直接返回
   |-c_heap_insert()                          ← 写入read_heap,通过rf_next_read排序,也即下次要读取时间

----- 各个插件在采集完数据之后用于保存数据
plugin_dispatch_values()
 |-check_drop_value()                         ← 检查是否会丢弃数据
 | |-get_drop_probability()                   ← 用于计算是否丢弃
 |-plugin_write_enqueue()                     ← 直接写入到write_queue链表的末尾
   |-plugin_value_list_clone()                ← 复制一份,会自动填充主机名、采集时间、采样频率
   | |-malloc()
   | |-meta_data_clone()
   |-plugin_get_ctx()                        保存读插件的上下文信息
   |-pthread_mutex_lock()                    对write_lock加锁
   |-pthread_cond_signal()                   向write_cond发送信号
   |-pthread_cond_signal()                    ← 添加到write_queue链表中,并发送write_cond

----- 注册简单写回调函数,保存到list_write链表中
plugin_register_write()
 |-create_register_callback()
   |-calloc()                                 ← 分配插件需要空间
   |-register_callback()                      ← 添加到list_write链表中

----- 注册到链表中,部分需要插件需要执行刷新,如rrdtool、network会注册该函数
plugin_register_flush()
 |-create_register_callback()                 ← 添加到list_flush链表中
 |-plugin_register_complex_read()             ← 如果刷新时间间隔不为0,则调用该插件注册
                                              ← 回调函数plugin_flush_timeout_callback()

读线程

在通过 plugin_register_read() 函数进行注册时,会生成 read_list 和 read_heap 两个全局变量,其中 read_list 只是用来维护注册的读插件,实际用途不大;而 read_heap 类似于堆排序,用于获取最近需要进行读取的插件,也就是说,下个时间点调用那个函数,通过 heap 进行排序。

如果调用插件读取失败,则会采用 double 的退避措施,也就是将下次采集时间乘以 2,直到 max_read_interval 值;读取成功时则会直接恢复原有的采集时间。

static void *plugin_read_thread(void __attribute__((unused)) * args) {
  while (read_loop != 0) {
    // 从read_heap中读取排序在前的对象,一般只是由于多线程之间竞争导致阻塞
    // 不过,还有可能会存在线程数大于插件数的情况,此时就会进入条件等待
    pthread_mutex_lock(&read_lock);
    rf = c_heap_get_root(read_heap);
    if (rf == NULL) {
      pthread_cond_wait(&read_cond, &read_lock);
      pthread_mutex_unlock(&read_lock);
      continue;
    }
    pthread_mutex_unlock(&read_lock);

    // 也就是监控的时间间隔,正常会在插件加载的时候已经配置
    // 默认是采用全局的配置,不过也可以在插件配置中设置自己的参数
    if (rf->rf_interval == 0) {
      rf->rf_interval = plugin_get_interval();
      rf->rf_effective_interval = rf->rf_interval;
      rf->rf_next_read = cdtime();
    }

    // 正常来说我们可以在此执行nsleep()做等待,但是在关闭读线程时,仍使用read_cond
    pthread_mutex_lock(&read_lock);
    // 对于Linux来说,不会有惊群现象,但是对于NetBSD而且CPU>1时会存在问题
    rc = 0;
    while ((read_loop != 0) && (cdtime() < rf->rf_next_read) && rc == 0) {
      rc = pthread_cond_timedwait(&read_cond, &read_lock,
                                  &CDTIME_T_TO_TIMESPEC(rf->rf_next_read));
    }
    rf_type = rf->rf_type;          // 需要持有read_lock读取
    pthread_mutex_unlock(&read_lock);

    // 如上所述,此时可能是需要退出的,在此检查下
    if (read_loop == 0) {
      c_heap_insert(read_heap, rf); // 为了可以正常free,需要将rf重新插入到read_heap中
      break;
    }

    // 在plugin_unregister_read()函数中,已经将该插件删除,此时只需要删除即可
    if (rf_type == RF_REMOVE) {
      DEBUG("plugin_read_thread: Destroying the `%s' callback.", rf->rf_name);
      sfree(rf->rf_name);
      destroy_callback((callback_func_t *)rf);
      rf = NULL;
      continue;
    }

    // OK,开始正式调用插件读取数据了
    DEBUG("plugin_read_thread: Handling `%s'.", rf->rf_name);
    start = cdtime();
    old_ctx = plugin_set_ctx(rf->rf_ctx);
    if (rf_type == RF_SIMPLE) {
      int (*callback)(void);
      callback = rf->rf_callback;
      status = (*callback)();
    } else {
      plugin_read_cb callback;
      assert(rf_type == RF_COMPLEX);
      callback = rf->rf_callback;
      status = (*callback)(&rf->rf_udata);
    }
    plugin_set_ctx(old_ctx);


    // 如果失败则会将下次采集的时间间隔double,当然是有上限的;成功时则会恢复原有的采集时间间隔
    if (status != 0) {
      rf->rf_effective_interval *= 2;
      if (rf->rf_effective_interval > max_read_interval)
        rf->rf_effective_interval = max_read_interval;
    } else {
      rf->rf_effective_interval = rf->rf_interval;
    }
    now = cdtime();
    elapsed = (now - start);  // 计算本次花费的时间
    if (elapsed > rf->rf_effective_interval)
      WARNING(... ...);

    // 计算下次需要调用插件的时间
    rf->rf_next_read += rf->rf_effective_interval;

    // 如果采集时间超过了时间间隔,则立即重新采集
    if (rf->rf_next_read < now) {
      rf->rf_next_read = now;
    }

    // 重新写入到read_heap中,到此插件调用结束
    c_heap_insert(read_heap, rf);
  } /* while (read_loop) */

  pthread_exit(NULL);
  return ((void *)0);
} /* void *plugin_read_thread */

从上述的函数调用可知,collectd 框架不会负责数据采集写入,需要由各个插件负责。

插件采集数据

实际上,在 contrib/examples 目录下有个插件的示例程序;在此选一个比较简单的插件 load,一般最终会通过 plugin_dispatch_values() 函数提交。该函数主要是将数据添加到 write_queue_head 列表中,并发送 write_cond。

插件实现

exec

这是一个通用的插件,不过每次执行时都需要 fork 一个进程,如果需要多次采集那么其性能会变的很差,所以对于一些特定的插件,如 Python 建议不要使用该插件。

如下是一个 exec 插件的配置示例。

Loadplugin exec
<Plugin exec>
  Exec "user:group" "program"
  Exec "some-user" "/path/to/another/binary" "arg0" "arg1"
  NotificationExec "user" "/path/to/handle_notification"
</Plugin>

以 bash 插件为例,直接通过 Plain text protocol 方式向 Collectd 发送数据。

#!/bin/bash

HOSTNAME="${COLLECTD_HOSTNAME:-`hostname -f`}"
INTERVAL="${COLLECTD_INTERVAL:-10}"
PORT=6379

while sleep "$INTERVAL"; do
    info=$(echo info|nc -w 1 127.0.0.1 $PORT)
    connected_clients=$(echo "$info"|awk -F : '$1 == "connected_clients" {print $2}')
    connected_slaves=$(echo "$info"|awk -F : '$1 == "connected_slaves" {print $2}')
    uptime=$(echo "$info"|awk -F : '$1 == "uptime_in_seconds" {print $2}')
    used_memory=$(echo "$info"|awk -F ":" '$1 == "used_memory" {print $2}'|sed -e 's/\r//')
    changes_since_last_save=$(echo "$info"|awk -F : '$1 == "changes_since_last_save" {print $2}')
    total_commands_processed=$(echo "$info"|awk -F : '$1 == "total_commands_processed" {print $2}')
    keys=$(echo "$info"|egrep -e "^db0"|sed -e 's/^.\+:keys=//'|sed -e 's/,.\+//')

    echo "PUTVAL $HOSTNAME/redis-$PORT/memcached_connections-clients interval=$INTERVAL N:$connected_clients"
    echo "PUTVAL $HOSTNAME/redis-$PORT/memcached_connections-slaves interval=$INTERVAL N:$connected_slaves"
    echo "PUTVAL $HOSTNAME/redis-$PORT/uptime interval=$INTERVAL N:$uptime"
    echo "PUTVAL $HOSTNAME/redis-$PORT/df-memory interval=$INTERVAL N:$used_memory:U"
    echo "PUTVAL $HOSTNAME/redis-$PORT/files-unsaved_changes interval=$INTERVAL N:$changes_since_last_save"
    echo "PUTVAL $HOSTNAME/redis-$PORT/memcached_command-total interval=$INTERVAL N:$total_commands_processed"
    echo "PUTVAL $HOSTNAME/redis-$PORT/memcached_items-db0 interval=$INTERVAL N:$keys"
done

在配置文件中添加如下内容即可。

<Plugin exec>
    Exec nobody "/etc/collectd/redis.sh"
</Plugin>

python

Python 中有很多不错的库,例如可以通过 pypi/collectd 库,可以向 collectd 的服务端发送数据。而 collectd 的 Python 插件实现,实际上是在插件中以 C 语言的形式实现了一个 collectd 插件。

示例

Collectd 的插件回调函数同样可以在 Python 中调用,也即 configinitreadwritelogflushshutdown 等接口,当然需要在其前添加 register_*

import collectd

PATH = '/proc/uptime'

def config_func(config):
    path_set = False
    for node in config.children:
        key = node.key.lower()
        val = node.values[0]

        if key == 'path':
            global PATH
            PATH = val
            path_set = True
        else:
            collectd.info('cpu_temp plugin: Unknown config key "%s"' % key)

    if path_set:
        collectd.info('cpu_temp plugin: Using overridden path %s' % PATH)
    else:
        collectd.info('cpu_temp plugin: Using default path %s' % PATH)

def read_func():
    # Read raw value
    with open(PATH, 'rb') as f:
        temp = f.read().strip()

    # Convert to degrees celsius
    deg = float(int(temp)) / 1000

    # Dispatch value to collectd
    val = collectd.Values(type='temperature')
    val.plugin = 'cpu_temp'
    val.dispatch(values=[deg])

collectd.register_config(config_func)
collectd.register_read(read_func)

然后在配置文件中添加如下内容即可。

LoadPlugin python
<Plugin python>
    ModulePath "/opt/collectd_plugins"
    Import "cpu_temp"
    <Module cpu_temp>
        Path "/sys/class/thermal/thermal_zone0/temp"
    </Module>
</Plugin>

java

与 Python 相似,同样是内嵌了 JVM ,并将 API 暴露给 JAVA 程序,这样就不需要每次重新调用生成新的进程以及启动 JVM 。

在 CentOS 中,编译前需要安装开发包,如 java-1.8.0-openjdk-devel,在通过 configure 命令进行配置时需要添加 --enable-java --with-java=$JAVA_HOME 参数;除了上述两个参数外,还可以通过命令行指定 JAVA 相关的参数,示例如下:

$ ./configure --with-java=$JAVA_HOME JAVA_CFLAGS="-I$JAVA_HOME/include -I$JAVA_HOME/include/linux" \
    JAVA_CPPFLAGS="-I$JAVA_HOME/include -I$JAVA_HOME/include/linux"                                \
    JAVA_LDFLAGS="-I$JAVA_HOME/include -I$JAVA_HOME/include/linux"                                 \
    JAVA_LIBS="-I$JAVA_HOME/include" JAR="/path/to/jar" JAVAC="/path/to/javac"                     \
    --enable-java=force

编译完成后,会生成 bindings/java/.libs/{collectd-api.jar,generic-jmx.jar} 两个 jar 包;当通过 RPM 包安装时,默认会安装到 /usr/share/collectd/java/ 目录下。

如下是一个 java 配置内容。

<Plugin "java">
  JVMArg "-verbose:jni"
  JVMArg "-Djava.class.path=/lib/collectd/bindings/java"

  LoadPlugin "org.collectd.java.Foobar"
  <Plugin "org.collectd.java.Foobar">
    # To be parsed by the plugin
  </Plugin>
</Plugin>

以及采集程序示例。

import org.collectd.api.Collectd;
import org.collectd.api.ValueList;
import org.collectd.api.CollectdReadInterface;

public class Foobar implements CollectdReadInterface
{
  public Foobar ()
  {
    Collectd.registerRead ("Foobar", this);
  }

  public int read ()
  {
    ValueList vl;

    /* Do something... */

    Collectd.dispatchValues (vl);
  }
}

杂项

libtoolize

如果报 libtoolize: 'COPYING.LIB' not found in '/usr/share/libtool/libltdl' 错误。

依赖 libtool-ltdl-devel 库,一个 GUN 提供的库,类似于 POSIX 的 dlopen() ,不过据说更简单且强大;详细可以参考官方文档 Using libltdl

另外,需要注意的是,该库并不能保证线程安全。

BUG-FIX

日志输出

如果将日志输出到文件时,可能会报如下的错误。

collectd -C collectd.conf -t
logfile plugin: fopen (/opt/collectd/var/log/collectd.log) failed: No such file or directory

修改 logfile.c 中的默认文件,设置为标准错误输出 STDERR

//#define DEFAULT_LOGFILE LOCALSTATEDIR "/log/collectd.log"
#define DEFAULT_LOGFILE "stderr"

FAQ

1. meta data 的作用是?

meta data (meta_data_t) 用于将一些数据附加到已经存在的结构体上,如 values_list_t,对应的是 KV 结构,其中一个使用场景是 network 插件,用于标示从哪个插件采集来的,防止出现循环,也用于该插件的 Forward 选项。

参考

一些与 collectd 相关的工具,可以参考 Related sites



如果喜欢这里的文章,而且又不差钱的话,欢迎打赏个早餐 ^_^


About This Blog

Recent Posts

Categories

Related Links

  • RTEMS
    RTEMS
  • GNU
  • Linux Kernel
  • Arduino

Search


This Site was built by Jin Yang, generated with Jekyll, and hosted on GitHub Pages
©2013-2018 – Jin Yang