【kafka KSQL】游戏日志统计分析(1)

【kafka KSQL】游戏日志统计分析(1)

以游戏结算日志为例,展示利用KSQL对日志进行统计分析的过程。

启动confluent

cd ~/Documents/install/confluent-5.0.1/
bin/confluent start

查看kafka主题列表

bin/kafka-topics –list –zookeeper localhost:2181

创建接受游戏结算日志的topic

bin/kafka-topics –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic score-normalized

使用生产者命令行工具往topic中写日志

bin/kafka-console-producer –broker-list localhost:9092 –topic score-normalized
>
{“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手1区_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}

使用消费者命令行工具查看日志是否正常写入

bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic score-normalized –from-beginning
;; 可以看到
{“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-04_07:09:28_高手1区_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}

启动KSQL客户端

bin/ksql http://localhost:8088

可以看到ksql启动后的图标,和操作终端。

ksql终端查看kafka topic列表

ksql> show topics;

打印topic中的消息

PRINT ‘score-normalized’;

可以看到:

Format:STRING
19-1-5 下午11时59分31秒 , NULL , {“cost”:7, “epoch”:1512342568296,“gameId”:“2017-12-0407:09:28\xE9\xAB\x98\xE6\x89\x8B1\xE5\x8C\xBA_200_015_185175”,“gameType”:“situan”,“gamers”: [{“balance”:4405682,“delta”:-60,“username”:“0791754000”}, {“balance”:69532,“delta”:-60,“username”:“70837999”}, {“balance”:972120,“delta”:-60,“username”:“abc6378303”}, {“balance”:23129,“delta”:180,“username”:“a137671268”}],“reason”:“xiayu”}

其中:

19-1-5 下午11时59分31秒NULLkafka-console-producerNULL

从topic score-normalized创建一个Stream

CREATE STREAM SCORE_EVENT <br/>
 (epoch BIGINT, <br/>
  gameType VARCHAR, <br/>
  cost INTEGER, <br/>
  gamers ARRAY&lt; <br/>

          STRUCT&lt; \<br/>
                  username VARCHAR, \<br/>
                  balance BIGINT, \<br/>
                  delta BIGINT \<br/>
                  &gt; \<br/>
           &gt;, \<br/>

gameId VARCHAR, <br/> tax BIGINT, <br/> reason VARCHAR) <br/> WITH ( KAFKA_TOPIC=‘score-normalized’, <br/>

     VALUE_FORMAT=&#39;JSON&#39;, \<br/>
     TIMESTAMP=&#39;epoch&#39;);</code></pre>

TIMESTAMP=‘epoch’

删除一个STREAM

DROP  STREAM stream_name ;

如果有查询语句在查询该流,则会出现错误:

Cannot drop USER_SCORE_EVENT.
The following queries read from this source: [].
The following queries write into this source: [CSAS_USER_SCORE_EVENT_2, InsertQuery_4, InsertQuery_5, InsertQuery_3].
You need to terminate them before dropping USER_SCORE_EVENT.
TERMINATE
TERMINATE CSAS_USER_SCORE_EVENT_2;
TERMINATE InsertQuery_4;

从最早记录开始查询

ksql&gt; SET ‘auto.offset.reset’ = ‘earliest’;

从Stream中查询所有数据

ksql&gt; SELECT * FROM SCORE_EVENT;

可以看到:

1546702389664 | null | 1512342568296 | situan | 7 | [{USERNAME=0791754000, BALANCE=4405682, DELTA=-60}, {USERNAME=70837999, BALANCE=69532, DELTA=-60}, {USERNAME=abc6378303, BALANCE=972120, DELTA=-60}, {USERNAME=a137671268, BALANCE=23129, DELTA=180}] | 2017-12-04_07:09:28_高手1区_200_015_185175 | null | xiayu

其中:

tax
2017-12-04
;; 增加一个game_date字段,用于统计
CREATE STREAM SCORE_EVENT_WITH_DATE AS <br/>

SELECT SUBSTRING(gameId, 0, 10) AS game_date, * \<br/>
FROM SCORE_EVENT;

SELECT game_date, COUNT(*) <br/>

FROM SCORE_EVENT_WITH_DATE \<br/>
WHERE game_date = &#39;2017-12-04&#39; AND reason = &#39;game&#39; \<br/>
GROUP BY game_date;<br/>

目前KSQL还不支持类似下面的查询:

SELECT COUNT(*) <br/>
  FROM SCOREEVENT <br/>
  WHERE gameId LIKE ‘2017-12-04%’;

统计参与对局的总玩家数(去重)

因为一条日志中包含多个玩家的对局信息,所以想法把每个玩家拆分成单独的事件

USER_SCORE_EVENT
CREATE STREAM USER_SCORE_EVENT AS <br/>

SELECT epoch, gameType, cost, gameId, tax, reason, gamers[0]-&gt;username AS username, gamers[0]-&gt;balance AS balance, gamers[0]-&gt;delta AS delta \<br/>
FROM SCORE_EVENT;

INSERT INTO USER_SCORE_EVENT <br/>

SELECT epoch, gameType, cost, gameId, tax, reason, gamers[1]-&gt;username AS username, gamers[1]-&gt;balance AS balance, gamers[1]-&gt;delta AS delta \<br/>
FROM SCORE_EVENT;

INSERT INTO USER_SCORE_EVENT <br/>

SELECT epoch, gameType, cost, gameId, tax, reason, gamers[2]-&gt;username AS username, gamers[2]-&gt;balance AS balance, gamers[2]-&gt;delta AS delta \<br/>
FROM SCORE_EVENT;

INSERT INTO USER_SCORE_EVENT <br/>

SELECT epoch, gameType, cost, gameId, tax, reason, gamers[3]-&gt;username AS username, gamers[3]-&gt;balance AS balance, gamers[3]-&gt;delta AS delta \<br/>
FROM SCORE_EVENT;</code></pre>

JOIN
CREATE STREAM USER_SCORE_EVENT_REKEY AS <br/>
SELECT * FROM USER_SCORE_EVENT <br/>
PARTITION BY username;

输出:

ksql&gt; SELECT * FROM USER_SCORE_EVENT_REKEY;
4000 | lzc | 4000 | situan | 7 | 2017-12-04_07:09:28_高手2区_500_015_185175 | null | game | lzc | 972120 | -60
4000 | lzb | 4000 | situan | 7 | 2017-12-04_07:09:28_高手2区_500_015_185175 | null | game | lzb | 69532 | -60

注意:

实践过程中发现:只有对STREAM的field进行PARTITION BY才能生效。

USER_SCORE_TABLE
CREATE TABLE USER_SCORE_TABLE AS <br/>

SELECT username, COUNT(*) AS game_count, SUM(delta) AS delta_sum, SUM(tax) AS tax_sum \<br/>
FROM USER_SCORE_EVENT_REKEY \<br/>
WHERE reason = &#39;game&#39; \<br/>
GROUP BY username;</code></pre>

USER_SCORE_TABLE
ksql&gt; SELECT * FROM USER_SCORE_TABLE;
1546709338711 | 70837999 | 70837999 | 4 | -240 | 0
1546709352758 | 0791754000 | 0791754000 | 4 | -240 | 0
1546709338711 | a137671268 | a137671268 | 4 | 720 | 0
1546709352758 | abc6378303 | abc6378303 | 4 | -240 | 0
  • 查询某个玩家的对局数、输赢总数、贡献的总税收:
ksql&gt; SELECT * FROM USER_SCORE_TABLE WHERE username = ‘70837999’;

输出:

1546709338711 | 70837999 | 70837999 | 4 | -240 | 0

统计玩家总数(去重)

傀儡列
CREATE TABLE USER_SCORE_WITH_TAG AS <br/>

SELECT 1 AS tag, * FROM USER_SCORE_TABLE;</code></pre>

  • 统计去重后的玩家总数
SELECT tag, COUNT(username) <br/>
FROM USER_SCORE_WITH_TAG <br/>
GROUP BY tag;

KSQL WINDOW 功能。