Kafka 简介

2016-08-27 Saturday     linux , misc

简介

Kafka 中可以将 Topic 从物理上划分成一个或多个分区 (Partition),每个分区在物理上对应一个文件夹,以 topicName_partitionIndex 的命名方式命名,该文件夹下存储这个分区的所有消息 (.log) 和索引文件 (.index),这使得 Kafka 的吞吐率可以水平扩展。

在通过命令行创建 Topic 时,可以使用 --partitions <numPartitions> 参数指定分区数;也可以在 server.properties 配置文件中配置参数 num.partitions 来指定默认的分区数。需要注意的是,在为 Topic 创建分区时,分区数最好是 broker 数量的整数倍。

Kafka 安装

直接从 kafka.apache.org 下载二进制安装包,然后解压。

单机测试

----- 1. 通过脚本启动单节点Zookeeper实例
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

----- 2. 启动Kafka服务
$ bin/kafka-server-start.sh config/server.properties >logs/kafka-start.logs 2>&1 &

----- 3. 创建单分区单副本的topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

----- 4. 查看topic
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

----- 5. 发送消息
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello world!
Hello Kafka!

----- 6. 消费消息,接收消息并在终端打印,--zookeeper参数在逐渐取消,建议使用后者
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Hello world!
Hello Kafka!

单机多broker配置

利用单节点部署多个 broker,不同的 broker 设置不同的 id 、监听端口、日志目录。

----- 1. 新增配置文件,修改上述的三个主要配置
$ cp config/server.properties config/server-1.properties
$ cat config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

----- 2. 启动Kafka服务
$ bin/kafka-server-start.sh config/server-1.properties &

librdkafka

rd_kafka_produce()                      ← 发送消息
 |-rd_kafka_msg_new()
   |-rd_kafka_msg_new0() 创建消息并初始化,包括消息长度、标记位、timeout等内容
   |-rd_kafka_msg_partitioner() 分区并添加到队列
   | |-rd_kafka_toppar_get() 获取partition
   | |-rd_kafka_toppar_enq_msg() 加入队列,等待broker线程取出
   |   |-rd_kafka_msgq_enq() 真正添加到topic partition队列中,注意需要进行加锁
   |     |-TAILQ_INSERT_TAIL() 添加到队列末尾
   |-rd_kafka_msg_destroy()
rd_kafka_broker_add()

rd_kafka_new() 创建新的实例,会返回rd_kafka_t结构体
 |-rd_kafka_cgrp_new() 如果是消费者会创建分组
 |-rd_kafka_thread_main() 为生产者、消费者启动单独线程处理 rd_kafka_thread_main()
 | |-rd_snprintf() 将线程名称设置为main
 | |-rd_kafka_q_serve() 从rk_ops队列中读取消息
 |   |-rd_kafka_op_handle() 处理完成后调用回调函数
 |-rd_kafka_broker_add() 创建内部broker线程,也就是 rd_kafka_broker_thread_main()

rd_kafka_broker_thread_main()
 |-rd_kafka_broker_terminating() 如果没有关闭
 |-rd_kafka_broker_connect() 在初始化以及STATE_DOWN时,不断重试链接
 | |-rd_kafka_broker_resolve()
 | | |-rd_getaddrinfo()
 | |-rd_kafka_transport_connect() 设置网络异步通讯
 | | |-rd_fd_set_nonblocking()
 | | |-rd_kafka_transport_poll_set()
 | |-rd_kafka_broker_set_state()
 |
 |-rd_kafka_broker_producer_serve() 处于STATE_UP时根据不同的类型调用不同函数
 | |-rd_kafka_broker_terminating()
 | |-rd_kafka_toppar_producer_serve() 会通过TAILQ_FOREACH()从队列中读取,然后调用该函数
     |-rd_kafka_msgq_concat() 如果有需要发送的消息,则直接将消息从rktp->rktp_msgq移动到rktp->rktp_xmit_msgq,并清空前者
     |-rd_kafka_msgq_age_scan()
  |-rd_kafka_broker_produce_toppar()
    |-rd_kafka_buf_new() 新建发送缓冲区
 | |-rd_kafka_broker_serve() 从队列中读取消息,并将数据发送到网络
 |   |-rd_kafka_q_pop() 会返回具体的操作类型
 |   | |-rd_kafka_q_pop_serve()
 |   |-rd_kafka_broker_op_serve() 根据不同的操作类型调用不同函数接口
 |   |
 |   |-rd_kafka_transport_io_serve()
 |     |-rd_kafka_transport_poll() 调用操作系统的poll()系统调用接口
 |     |-rd_kafka_transport_poll_clear()
 |     |-rd_kafka_transport_io_event() 处理返回的IO事件
 |       |-rd_kafka_send() 例如发送
 |       |-rd_kafka_recv() 以及接收
 |-rd_kafka_broker_consumer_serve()

这也就意味着应用程序将消息发送给 rdkafka,然后 rdkafka 会直接将消息保存到队列中,并由其它线程异步发送给 broker 。

在调用 rd_kafka_new() 函数时,会根据入参是消费者还是生产者返回 rd_kafka_t 对象,此时会同时创建一个主处理线程,也就是 rd_kafka_thread_main()

在主线程中会从 rd_kafka_t.rk_ops 队列中逐一读取消息,然后根据不同的操作类型 (通过rd_kafka_op_type_t定义) 分别进行处理,其中操作通过 rd_kafka_op_t 定义,操作类型对应了其中的 rko_type 成员,操作执行完后调用 rd_kafka_op_handle() 回调。

参考



如果喜欢这里的文章,而且又不差钱的话,欢迎打赏个早餐 ^_^


About This Blog

Recent Posts

Categories

Related Links

  • RTEMS
    RTEMS
  • GNU
  • Linux Kernel
  • Arduino

Search


This Site was built by Jin Yang, generated with Jekyll, and hosted on GitHub Pages
©2013-2019 – Jin Yang