博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【大数据实践】游戏事件处理系统(3)——消息中间件-kafka
阅读量:6702 次
发布时间:2019-06-25

本文共 12731 字,大约阅读时间需要 42 分钟。

前言

上一篇文章中,对日志的处理进行了讲解,其事件最终要输出到kafka集群中。因此,在本文章中,将介绍简单kafka集群的创建过程。本篇文章完成后,系统应该能够跑通日志收集、处理及输出到kafka,并能使用kafka的工具验证消息的正确性。

启动zookeeper

启动命令:

bin/zookeeper-server-start.sh config/zookeeper.properties

zookeeper.properties配置文件中, 主要配置参数为:

# the directory where the snapshot is stored.dataDir=/tmp/zookeeper# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0
  • dataDir:存放内存数据库镜像和更新数据库的事务日志(transaction log)的目录。
  • clientPort:zookeeper服务的端口号。
  • maxClientCnxns:每个ip连接zookeeper时连接数的限制,如果不设置或设为0时,表示连接数没有限制。注意:kafka的broker连接也计算在内,因此,如果maxClientCnxns = 1,那么不能在同一台机器上即启动kafka server连接zookeeper,又启动kafka producer来连接。

启动Kafka Server

启动命令:

bin/kafka-server-start.sh config/server.properties

执行成功后,即启动了一个broker(代理),其中server.properties文件中对该broker做了配置,主要有:

############################# Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.# 代理ID,每个代理的ID必须是唯一的broker.id=0############################# Socket Server Settings ############################## The address the socket server listens on. It will get the value returned from# java.net.InetAddress.getCanonicalHostName() if not configured.#   FORMAT:#     listeners = listener_name://host_name:port#   EXAMPLE:#     listeners = PLAINTEXT://your.host.name:9092# listeners=PLAINTEXT://:9092# 如果不设置,则默认的java.net.InetAddress.getCanonicalHostName()得到的主机名,默认9092端口和PLAINTEXT协议。# 协议还有PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL等。listeners=PLAINTEXT://localhost:9092# Hostname and port the broker will advertise to producers and consumers. If not set,# it uses the value for "listeners" if configured.  Otherwise, it will use the value# returned from java.net.InetAddress.getCanonicalHostName().#advertised.listeners=PLAINTEXT://your.host.name:9092# 通知给生成者和消费者的监听地址,需要和listeners一样。如果不配置该选项,则默认会将上面# listeners配置的地址发送给生产者和消费者advertised.listeners=PLAINTEXT://localhost:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details## 安全协议#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network# 用于接收网络请求以及发送网络请求的线程数。num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O# 用于处理请求(可能包含韩磁盘I/O处理)的线程数。num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server# socket发送缓冲区大小(字节数),默认100kbsocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server# socket接收缓冲区大小(字节数),默认100kbsocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)# 为防止OutOfMemery异常而设置的每个请求最大数据大小,默认100Mb。socket.request.max.bytes=104857600############################# Log Basics ############################## 日志的基本设置# A comma separated list of directories under which to store log files# kafka接收到日志(消息)后,这些日志存放的目录(而不是kafka服务输入的日志)。# 可以指定多个目录,中间用逗号分隔。log.dirs=/tmp/kafka-logs# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.# 该borker的分区数量,分区数量多,则并行高,但同时也意味着brokers之间将有更多的文件。num.partitions=3# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.# 当服务启动时,为每个数据目录分配用于恢复数据的线程数,或者是当服务关闭时,为每个数据目录分配用于写入数据的线程数。# 默认为1, 但对于磁盘阵列(RAID array),建议增加该值的大小。num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  ############################## 内部的主题设置,卡夫卡主题管理相关的配置项。# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1############################# Log Flush Policy ############################### 日志写入到磁盘文件的策略## 配置的时候,需要在性能、可靠性和数据吞吐量之间进行权衡:##  1. 可靠性:如果不使用备份,不将数据flush到磁盘,可能导致数据丢失。##  2. 延迟:如果消息记录数设置的太大,可能导致一次要flush的数据太多而造成性能瓶颈。##  3. 吞吐量:将数据flush到磁盘通常是最昂贵的操作,如果设置的时间间隔太小,可能带来过多寻道。# Messages are immediately written to the filesystem but by default we only fsync() to sync# the OS cache lazily. The following configurations control the flush of data to disk.# There are a few important trade-offs here:#    1. Durability: Unflushed data may be lost if you are not using replication.#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.# The settings below allow one to configure the flush policy to flush data after a period of time or# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk# 每当消息记录数达到10000时flush一次数据到磁盘#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush# 每间隔1000毫秒flush一次数据到磁盘#log.flush.interval.ms=1000############################# Log Retention Policy ############################### 日志文件保留策略## 1. 每隔一段时间删除## 2. 当日志达到一定大小的时候被删除## 当达到以上任意一条,则日志被删除# The following configurations control the disposal of log segments. The policy can# be set to delete segments after a period of time, or after a given size has accumulated.# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age# 默认日志文件保留时间为1周log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining# segments drop below log.retention.bytes. Functions independently of log.retention.hours.# 保留文件大小,默认保留最近的1G。#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.# 日志文件最大大小,超过该大小,将会新建另外一个日志文件。# topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1表示没有大小限。log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according# to the retention policies# 日志文件的检查周期,以判断是否达到处理策略规定的条件log.retention.check.interval.ms=300000############################# Zookeeper ############################### Zookeeper相关设置# Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.## 连接到zookeeper集群,使用逗号分隔各个zookeeper服务的ip:port对。zookeeper.connect=localhost:2181# Timeout in ms for connecting to zookeeper## ZooKeeper的连接超时时间zookeeper.connection.timeout.ms=6000############################# Group Coordinator Settings ############################### 组协调者相关设置# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.# The default value for this is 3 seconds.# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.## 空消费组延时时间,设为0是为了方便开发,实际发布生成线中配置为3秒更好。group.initial.rebalance.delay.ms=0

从这个配置文件中,大概可以窥探到kafka有的一些功能,里面很多配置自己也不是很懂,后续再专门研究一下。

如果只是简单地试验尝试,使用下面几个配置就可以了:

  • broker.id=0
  • listeners=PLAINTEXT://127.0.0.1:9092
  • advertised.listeners=PLAINTEXT://127.0.0.1:9092
  • num.partitions=3(为了研究多分区)
  • zookeeper.connect=localhost:2181(连到zookeeper)

启动第二个broker

复制server.properties文件为server-1.propertis,修改配置,如:

  • broker.id=1
  • listeners=PLAINTEXT://127.0.0.1:9093
  • advertised.listeners=PLAINTEXT://127.0.0.1:9093
  • num.partitions=3(为了研究多分区)
  • zookeeper.connect=localhost:2181(连到zookeeper)

执行启动命令:

bin/kafka-server-start.sh config/server-1.properties

topic管理

创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic game-score
  • 新创建了一个game-score的topic。
  • replication-factor指的是topic需要在几个不同的broker保存。
  • partition为2,表示该主题有2个partition。

查看topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

可以看到信息:

game-score

查看topic信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic game-score

可看到如下信息:

Topic:game-score    PartitionCount:2    ReplicationFactor:2    Configs:    Topic: game-score    Partition: 0    Leader: 1    Replicas: 1,0    Isr: 1,0    Topic: game-score    Partition: 1    Leader: 0    Replicas: 0,1    Isr: 0,1
  • leader:表示当前指定的负责所有读和写的partition(分区),每个分区都有可能被选为leader。
  • replicas:表示保存副本的结点列表,不管他们是否为leader结点,也不管他们是否存活。
  • Isr:in-sync replicas的简写,表示存活且副本都已同步的的broker集合,是replicas的子集。

删除topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic game-score

并不会真正删除,而是标记为删除:

Topic game-score is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

修改topic的分区数

bin/kafka-topics.sh --zookeeper master:2181 --alter --topic game-score --partitions 2
  • 试验发现:无法使用--alter命令修改--replication-factor

查看topic各个分区的消息的信息

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group testgroup --topic test0 --zookeeper 127.0.0.1:2181

启动一个消费者

启动一个消费者,用于查看消息是否到达kafka集群:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic game-score --from-beginning

该命令会将消息dump出来,显示在控制台。

logstash output kafka配置

要想logstash将消息发送到kafka集群中,需要在logstash的output模块中使用。

配置如下:

output {    kafka{            # 主题ID                topic_id => "game-score"               # kafka服务的地址            bootstrap_servers => "127.0.0.1:9092"             # 一定要注明输出格式            codec => "json"    }}

配置好之后,将filebeat,logstash,kafka都启动好,往监控日志文件中新增日志,应该就能在kafka消费者控制台看到消息了。

这里贴一下成果,以示对自己的鼓励:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic game-score --from-beginning{"bet_count":"1","room_id":"002","score_type":"balance","game_time":"14:26:37","desk_id":"512","game_date":"2015-11-02","game_id":"2015-11-02_14:26:37_ÐÂÊÖÇø_1_002_512","game":"PDK","beat":{"name":"admindeMacBook-Pro-2.local","version":"6.2.4","hostname":"admindeMacBook-Pro-2.local"},"tax":0,"time":"2015-11-02 14:26:54,355","tags":["beats_input_codec_plain_applied"],"offset":21444,"users":[{"username":"ly6","win":15}],"bet_name":"ÐÂÊÖÇø","prospector":{"type":"log"},"source":"/Users/admin/Documents/workspace/elk/filebeat-6.2.4-darwin-x86_64/hjd_IScoreService.log"}

实现一个简单的Kafka消费者

pom.xml文件中,加入下依赖:

org.apache.kafka
kafka-clients
1.0.1

GameScoreConsumer.java如下:

package consumers;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;import java.util.Properties;public class GameScoreConsumer {    public static void main(String[] args) {        Properties props = new Properties();        props.put("bootstrap.servers", "localhost:9092");        props.put("group.id", "game-score-consumers");        props.put("enable.auto.commit", "true");        props.put("auto.commit.interval.ms", "1000");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        KafkaConsumer
consumer = new KafkaConsumer
(props); consumer.subscribe(Collections.singletonList("game-score")); while (true) { ConsumerRecords
records = consumer.poll(1000); for (ConsumerRecord
record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } } }}

启动,在日志文件中加入新的日志,该消费者即可接收到相应的信息。

小结

至此,从日志收集、处理到保存到消息中间件kafka的整个流程都已经走通。【大数据实践】游戏事件处理系统系列文章主要更倾向于试验,因此对深一层的理论研究和介绍不是很多,后面可能开另外的系列来讲。

转载地址:http://xswlo.baihongyu.com/

你可能感兴趣的文章
Java 装饰模式 (Decorator)
查看>>
JAVA虚拟机垃圾回收算法原理
查看>>
PHP开启curl_init
查看>>
动态规划法求背包问题
查看>>
【maven + hibernate(注解) +spring +springMVC】 使用maven搭建项目
查看>>
Mybatis-mapper-xml-基础
查看>>
如何在Visual Studio VS中定义多项目模板
查看>>
tcpip学习
查看>>
yii2权限控制rbac之菜单menu最详细教程
查看>>
国内四大炒股软件APP 全面技术解析
查看>>
C++ STL--queue 的使用方法
查看>>
[svc]visio绘制模具
查看>>
springmvc入门基础之注解和参数传递
查看>>
absolute绝对定位的非绝对定位用法
查看>>
小白全栈
查看>>
glib 散列表
查看>>
获取GridView TemplateField的数据
查看>>
Ecshop的lbi库文件中嵌套调用另一个lbi库文件
查看>>
Spring XmlBeanFactory例子[转]
查看>>
delphi AfterScrol
查看>>