【kafka KSQL】游戏日志统计分析(1)
- 作者: 五速梦信息网
- 时间: 2026年04月04日 13:56
【kafka KSQL】游戏日志统计分析(1)
以游戏结算日志为例,展示利用KSQL对日志进行统计分析的过程。
启动confluent
cd ~/Documents/install/confluent-5.0.1/
bin/confluent start
查看kafka主题列表
bin/kafka-topics –list –zookeeper localhost:2181
创建接受游戏结算日志的topic
bin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic score-normalized
使用生产者命令行工具往topic中写日志
bin/kafka-console-producer –broker-list localhost:9092 –topic score-normalized
>
{“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手1区_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}
使用消费者命令行工具查看日志是否正常写入
bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic score-normalized –from-beginning
;; 可以看到
{“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手1区_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}
启动KSQL客户端
bin/ksql http://localhost:8088
可以看到ksql启动后的图标,和操作终端。
ksql终端查看kafka topic列表
ksql> show topics;
打印topic中的消息
PRINT ‘score-normalized’;
可以看到:
Format:STRING
19-1-5 下午11时59分31秒 , NULL , {“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-0407:09:28\xE9\xAB\x98\xE6\x89\x8B1\xE5\x8C\xBA_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}
其中:
19-1-5 下午11时59分31秒NULLkafka-console-producerNULL
从topic score-normalized创建一个Stream
CREATE STREAM SCORE_EVENT <br/>
(epoch BIGINT, <br/>
gameType VARCHAR, <br/>
cost INTEGER, <br/>
gamers ARRAY< <br/>
STRUCT< \<br/>
username VARCHAR, \<br/>
balance BIGINT, \<br/>
delta BIGINT \<br/>
> \<br/>
>, \<br/>
gameId VARCHAR, <br/>
tax BIGINT, <br/>
reason VARCHAR) <br/>
WITH ( KAFKA_TOPIC=‘score-normalized’, <br/>
VALUE_FORMAT='JSON', \<br/>
TIMESTAMP='epoch');</code></pre>
TIMESTAMP=‘epoch’
删除一个STREAM
DROP STREAM stream_name ;
如果有查询语句在查询该流,则会出现错误:
Cannot drop USER_SCORE_EVENT.
The following queries read from this source: [].
The following queries write into this source: [CSAS_USER_SCORE_EVENT_2, InsertQuery_4, InsertQuery_5, InsertQuery_3].
You need to terminate them before dropping USER_SCORE_EVENT.
TERMINATE
TERMINATE CSAS_USER_SCORE_EVENT_2;
TERMINATE InsertQuery_4;
从最早记录开始查询
ksql> SET ‘auto.offset.reset’ = ‘earliest’;
从Stream中查询所有数据
ksql> SELECT * FROM SCORE_EVENT;
可以看到:
1546702389664 | null | 1512342568296 | situan | 7 | [{USERNAME=0791754000, BALANCE=4405682, DELTA=-60}, {USERNAME=70837999, BALANCE=69532, DELTA=-60}, {USERNAME=abc6378303, BALANCE=972120, DELTA=-60}, {USERNAME=a137671268, BALANCE=23129, DELTA=180}] | 2017-12-04_07:09:28_高手1区_200_015_185175 | null | xiayu
其中:
tax
2017-12-04
;; 增加一个game_date字段,用于统计
CREATE STREAM SCORE_EVENT_WITH_DATE AS <br/>
SELECT SUBSTRING(gameId, 0, 10) AS game_date, * \<br/>
FROM SCORE_EVENT;
SELECT game_date, COUNT(*) <br/>
FROM SCORE_EVENT_WITH_DATE \<br/>
WHERE game_date = '2017-12-04' AND reason = 'game' \<br/>
GROUP BY game_date;<br/>
目前KSQL还不支持类似下面的查询:
SELECT COUNT(*) <br/>
FROM SCOREEVENT <br/>
WHERE gameId LIKE ‘2017-12-04%’;
统计参与对局的总玩家数(去重)
因为一条日志中包含多个玩家的对局信息,所以想法把每个玩家拆分成单独的事件
USER_SCORE_EVENT
CREATE STREAM USER_SCORE_EVENT AS <br/>
SELECT epoch, gameType, cost, gameId, tax, reason, gamers[0]->username AS username, gamers[0]->balance AS balance, gamers[0]->delta AS delta \<br/>
FROM SCORE_EVENT;
INSERT INTO USER_SCORE_EVENT <br/>
SELECT epoch, gameType, cost, gameId, tax, reason, gamers[1]->username AS username, gamers[1]->balance AS balance, gamers[1]->delta AS delta \<br/>
FROM SCORE_EVENT;
INSERT INTO USER_SCORE_EVENT <br/>
SELECT epoch, gameType, cost, gameId, tax, reason, gamers[2]->username AS username, gamers[2]->balance AS balance, gamers[2]->delta AS delta \<br/>
FROM SCORE_EVENT;
INSERT INTO USER_SCORE_EVENT <br/>
SELECT epoch, gameType, cost, gameId, tax, reason, gamers[3]->username AS username, gamers[3]->balance AS balance, gamers[3]->delta AS delta \<br/>
FROM SCORE_EVENT;</code></pre>
JOIN
CREATE STREAM USER_SCORE_EVENT_REKEY AS <br/>
SELECT * FROM USER_SCORE_EVENT <br/>
PARTITION BY username;
输出:
ksql> SELECT * FROM USER_SCORE_EVENT_REKEY;
4000 | lzc | 4000 | situan | 7 | 2017-12-04_07:09:28_高手2区_500_015_185175 | null | game | lzc | 972120 | -60
4000 | lzb | 4000 | situan | 7 | 2017-12-04_07:09:28_高手2区_500_015_185175 | null | game | lzb | 69532 | -60
注意:
实践过程中发现:只有对STREAM的field进行PARTITION BY才能生效。
USER_SCORE_TABLE
CREATE TABLE USER_SCORE_TABLE AS <br/>
SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \<br/>
FROM USER_SCORE_EVENT_REKEY \<br/>
WHERE reason = 'game' \<br/>
GROUP BY username;</code></pre>
USER_SCORE_TABLE
ksql> SELECT * FROM USER_SCORE_TABLE;
1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
1546709352758 | 0791754000 | 0791754000 | 4 | -240 | 0
1546709338711 | a137671268 | a137671268 | 4 | 720 | 0
1546709352758 | abc6378303 | abc6378303 | 4 | -240 | 0
- 查询某个玩家的对局数、输赢总数、贡献的总税收:
ksql> SELECT * FROM USER_SCORE_TABLE WHERE username = ‘70837999’;
输出:
1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
统计玩家总数(去重)
傀儡列
CREATE TABLE USER_SCORE_WITH_TAG AS <br/>
SELECT 1 AS tag, * FROM USER_SCORE_TABLE;</code></pre>
- 统计去重后的玩家总数
SELECT tag, COUNT(username) <br/>
FROM USER_SCORE_WITH_TAG <br/>
GROUP BY tag;
续
KSQL WINDOW 功能。
相关文章
-
【K哥爬虫普法】大众点评VS百度地图,论“数据权属”对爬虫开发的罪与罚!
【K哥爬虫普法】大众点评VS百度地图,论“数据权属”对爬虫开发的罪与罚!
- 互联网
- 2026年04月04日
-
【k哥爬虫普法】非法入侵计算机信息系统,获取1500万余条个人信息!
【k哥爬虫普法】非法入侵计算机信息系统,获取1500万余条个人信息!
- 互联网
- 2026年04月04日
-
【learning】 扩展欧几里得算法(扩展gcd)和乘法逆元
【learning】 扩展欧几里得算法(扩展gcd)和乘法逆元
- 互联网
- 2026年04月04日
-
【JS档案揭秘】第一集 内存泄漏与垃圾回收
【JS档案揭秘】第一集 内存泄漏与垃圾回收
- 互联网
- 2026年04月04日
-
【JS 逆向百例】吾爱破解2022春节解题领红包之番外篇 Web 中级题解
【JS 逆向百例】吾爱破解2022春节解题领红包之番外篇 Web 中级题解
- 互联网
- 2026年04月04日
-
【Java面试】这应该是面试官最想听到的回答,Mysql如何解决幻读问题?
【Java面试】这应该是面试官最想听到的回答,Mysql如何解决幻读问题?
- 互联网
- 2026年04月04日






