试验环境打算Kafka安装
访问Kafka官方下载页面,下载稳定版本0.10.1.0的kafka.此安装包内早已附送,不须要额外安装.按次序执行如下方法:
cd ~/下载
sudo tar -zxf kafka_2.11-2.4.1.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.11-2.4.1/ ./kafka
sudo chown -R hadoop ./kafka
但是根据教程的版本会出bug,因此我们选择比较新的版本:
cd ~/下载
sudo tar -zxf kafka_2.12-3.2.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-3.2.0/ ./kafka
sudo chown -R hadoop ./kafka
检测简略例子
接出来在系统环境下检测简略的例子。Mac系统请自己根据安装的位置,切换到相应的指令。按次序执行如下命令:
# 进入kafka所在的目录
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
命令执行后不会返回Shell命令键入状态,还会根据默认的配置文件启动服务,请千万不要关掉当前终端.启动新的终端,键入如下命令:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
kafka服务端就启动了,请千万不要关掉当前终端。启动另外一个终端,键入如下命令:
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
topic是公布消息公布的,以单节点的配置争创了一个叫dblab的topic.可以用list列举所有争创的,来查看刚刚争创的主题是否存在。
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以在结果中查看到dblab这个topic存在。接下去用生产点数据:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
并尝试键入如下信息:
hello hadoop
hello xmu
hadoop world
之后重新开启新的终端或则直接按CTRL+C退出。之后使用来接收数据,键入如下命令:
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic codesheep --from-beginning
便可以看见刚刚形成的三条信息。说明kafka安装成功。
数据处理和操作Kafka数据预处理
数据集如下:
用户行为日志.csv,日志中的数组定义如下:
|卖家|商品|商品类型|商家|品牌|交易时间:月day|交易丑闻:日|行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示订购,3表示关注商品|卖家年纪分段:1表示年纪=50,0和NULL则表示未知|性别:0表示男性,1表示女性,2和NULL表示未知|收获地址省市
数据详细格式如下:
,,,,,month,day,,,,
,,833,2882,2661,08,29,0,0,1,内蒙
,,1271,2882,2661,08,29,0,1,1,广东
,,1271,2882,2661,08,29,0,2,1,广东
,,1271,2882,2661,08,29,0,1,1,内蒙
,,1271,1253,1049,08,29,0,0,2,广东
,,1271,2882,2661,08,29,0,0,2,广东
,,1467,2882,2661,08,29,0,5,2,广东
,,1095,883,1647,08,29,0,7,1,广东
这个案例实时统计每秒中男女生购物数量,所以针对每条购物日志,我们只须要获取即可,于是发送给Kafka,接下去再接收进行处理。
执行如下Shell命令来安装操作Kafka的代码库:
conda install kafka-python
后来是.py的代码:
# coding: utf-8
import csv
import time
from kafka import KafkaProducer
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv","r")
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)
for line in reader:
gender = line[9] # 性别在每行日志代码的第9个元素
if gender == 'gender':
continue # 去除第一行表头
time.sleep(0.1) # 每隔0.1秒发送一行数据
# 发送数据,topic为'sex'
producer.send('sex',line[9].encode('utf8'))
上述代码很简略,首先是先例子化一个Kafka生产者。之后调用用户日志文件,每天调用一行,接着每隔0.1秒发送给Kafka,那样一秒发送10条购物日志。这儿发送给Kafka的topic为’sex’。
操作kafka
我们可以写一个检测数据是否投递成功,代码如下,文件名为.py:
from kafka import KafkaConsumer
consumer = KafkaConsumer('sex')
for msg in consumer:
print((msg.value).decode('utf8'))
在开启上述和之前,还要先开启Kafka,命令如下:
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
打开一个新的命令行窗口,键入命令如下:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
在Kafka开启后来,即可开启和。
这时,你会看见屏幕上会输出一行又一行的数字,类似下边的样子:
实时处理数据配置Spark开发Kafka环境
下载Spark连结Kafka的代码库。之后把下载的代码库放在目录/usr/local/spark/jars目录下,命令如下:
sudo mv ~/下载/spark-streaming_2.12-3.2.0.jar /usr/local/spark/jars
sudo mv ~/下载/spark-streaming-kafka-0-10_2.12-3.2.0.jar /usr/local/spark/jars
于是在/usr/local/spark/jars目录下改建kafka目录,把/usr/local/kafka/libs下所有函数库复制到/usr/local/spark/jars/kafka目录下,命令如下
cd /usr/local/spark/jars
mkdir kafka
cd kafka
cp /usr/local/kafka/libs/* .
之后,更改Spark配置文件,命令如下:
cd /usr/local/spark/conf
sudo vim spark-env.sh
把Kafka相关jar包的路径信息提高到spark-env.sh,更改后的spark-env.sh类似如下:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
由于我使用的是中争创的环境,因此介绍一下,如何为spark设置环境。
还要更改conf目录下的.sh:在这个文件的开头添加:
export PYSPARK_PYTHON=/home/hadoop/anaconda3/envs/py37/bin/python
#其中,py37应该为自行创建的环境名称
!还要留意的是,作者在给出代码的时侯/usr/local//bin/这一段是不用写的实时分析程序考虑,假如直接复制起来会报错,去除后来就好了。
推行项目
首先在/usr/local/spark/改建项目目录
cd /usr/local/spark/mycode
mkdir kafka
于是在kafka这个目录下争创一个.py文件。
from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
#from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
import json
import sys
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType
from pyspark.sql.functions import col, column, expr
def KafkaWordCount(zkQuorum, group, topics, numThreads):
spark = SparkSession \
.builder \
.appName("KafkaWordCount") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
topicAry = topics.split(",")
# 将topic转换为hashmap形式,而python中字典就是一种hashmap
topicMap = {}
for topic in topicAry:
topicMap[topic] = numThreads
#lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(lambda x : x[1])
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sex") \
.load()
df.selectExpr( "CAST(timestamp AS timestamp)","CAST(value AS STRING)")
#lines = df.selectExpr("CAST(value AS STRING)")
windowedCounts = df \
.withWatermark("timestamp", "1 seconds") \
.groupBy(
window(col("timestamp"), "1 seconds" ,"1 seconds"),
col("value")) \
.count()
wind = windowedCounts.selectExpr( "CAST(value AS STRING)","CAST(count AS STRING)")
query = wind.writeStream.option("checkpointLocation", "/check").outputMode("append").foreach(sendmsg).start()
query.awaitTermination()
query.stop()
# 格式转化,将格式变为[{1: 3}]
def Get_dic(row):
res = []
#for elm in row:
tmp = {row[0]: row[1]}
res.append(tmp)
print(res)
return json.dumps(res)
def sendmsg(row):
print(row)
if row.count != 0:
msg = Get_dic(row)
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send("result", msg.encode('utf8'))
# 很重要,不然不会更新
producer.flush()
if __name__ == '__main__':
# 输入的四个参数分别代表着
# 1.zkQuorum为zookeeper地址
# 2.group为消费者所在的组
# 3.topics该消费者所消费的topics
# 4.numThreads开启消费topic线程的个数
if (len(sys.argv) < 5):
print("Usage: KafkaWordCount " )
exit(1)
zkQuorum = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3]
numThreads = int(sys.argv[4])
print(group, topics)
KafkaWordCount(zkQuorum, group, topics, numThreads)
代码功能:
首先按每秒的速率调用Kafka消息;之后对每秒的数据执行算法,统计出0的个数,1的个数,2的个数;最后将上述结果封装成json发送给Kafka。
在运行代码之前,先启动:
cd /usr/local/hadoop #这是hadoop的安装目录
./sbin/start-dfs.sh
运行项目
编撰好程序以后,接下去撰写运行脚本,在/usr/local/spark//kafka目录下改建.sh文件,键入如下内容:
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 /usr/local/spark/mycode/kafka/kafka_test.py 127.0.0.1:2181 1 sex 1
其中最后四个为键入参数,涵义如下
127.0.0.1:2181为地址1为group标签sex为消费者接收的为消费者句柄数
sh startup.sh
最后在/usr/local/spark//kafka目录下,运行如下命令即可执行刚编撰好的程序
报错:
结果展示
运用Flask争创web程序,运用Flask-实现实时推送数据,运用.io.js实现实时接收数据,.js呈现数据
Flask-实时推送数据
首先我们争创如图中的app.py文件,app.py的功能就是作为一个简易的服务器,处理连结恳求实时分析程序考虑,以及处理从kafka接收的数据,并实时推送到浏览器。app.py的代码如下:
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
# 因为第一步骤安装好了flask,所以这里可以引用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
# 实例化一个consumer,接收topic为result的消息
consumer = KafkaConsumer('result')
# 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器
def background_thread():
girl = 0
boy = 0
for msg in consumer:
data_json = msg.value.decode('utf8')
data_list = json.loads(data_json)
for data in data_list:
if '0' in data.keys():
girl = data['0']
elif '1' in data.keys():
boy = data['1']
else:
continue
result = str(girl) + ',' + str(boy)
print(result)
socketio.emit('test_message', {'data': result})
# 客户端发送connect事件时的处理函数
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
# 单独开启一个线程给客户端发送数据
thread = socketio.start_background_task(target=background_thread)
socketio.emit('connected', {'data': 'Connected'})
# 通过访问http://127.0.0.1:5000/访问index.html
@app.route("/")
def handle_mes():
return render_template("index.html")
# main函数
if __name__ == '__main__':
socketio.run(app, debug=True)
这段代码实现比较简略,最重要就是函数,该函数从Kafka接收消息,并进行处理,荣获男女生每秒钟数量,于是将结果通过函数.emit实时推送至浏览器。
浏览器获取数据并展示
index.html文件负责获取数据并展示疗效,该文件中的代码内容如下:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>DashBoard</title>
<script src="static/js/socket.io.js"></script>
<script src="static/js/jquery-3.1.1.min.js"></script>
<script src="static/js/highcharts.js"></script>
<script src="static/js/exporting.js"></script>
<script type="text/javascript" charset="utf-8">
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {
socket.emit('test_connect', {data: 'I\'m connected!'});
});
socket.on('test_message',function(message){
console.log(message);
var obj = eval(message);
var result = obj["data"].split(",");
$('#girl').html(result[0]);
$('#boy').html(result[1]);
});
socket.on('connected',function(){
console.log('connected');
});
socket.on('disconnect', function () {
console.log('disconnect');
});
</script>
</head>
<body>
<div>
<b>Girl: </b><b id="girl"></b>
<b>Boy: </b><b id="boy"></b>
</div>
<div id="container" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
$(document).ready(function () {
Highcharts.setOptions({
global: {
useUTC: false
}
});
Highcharts.chart('container', {
chart: {
type: 'spline',
animation: Highcharts.svg, // don't animate in old IE
marginRight: 10,
events: {
load: function () {
// set up the updating of the chart each second
var series1 = this.series[0];
var series2 = this.series[1];
setInterval(function () {
var x = (new Date()).getTime(), // current time
count1 = $('#girl').text();
y = parseInt(count1);
series1.addPoint([x, y], true, true);
count2 = $('#boy').text();
z = parseInt(count2);
series2.addPoint([x, z], true, true);
}, 1000);
}
}
},
title: {
text: '男女生购物人数实时分析'
},
xAxis: {
type: 'datetime',
tickPixelInterval: 50
},
yAxis: {
title: {
text: '数量'
},
plotLines: [{
value: 0,
width: 1,
color: '#808080'
}]
},
tooltip: {
formatter: function () {
return '' + this.series.name + '
' +
Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '
' +
Highcharts.numberFormat(this.y, 2);
}
},
legend: {
enabled: true
},
exporting: {
enabled: true
},
series: [{
name: '女生购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
},
{
name: '男生购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}]
});
});
</script>
</body>
</html>
.io.js
在index.html中包含了如下一段代码,就是拿来读取.io.js和.io.js.map这两个js库文件的:
<script type="text/javascript" charset="utf-8">
// 创建连接服务器的链接
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {// 连上服务器后的回调函数
socket.emit('connect', {data: 'I\'m connected!'});
});
// 接收服务器实时发送的数据
socket.on('test_message',function(message){
console.log(message);
var obj = eval(message);
var result = obj["data"].split(",");
// 将男生和女生人数展示在html标签内
$('#girl').html(result[0]);
$('#boy').html(result[1]);
});
socket.on('connected',function(){
console.log('connected');
});
// 链接断开时的回调函数
socket.on('disconnect', function () {
console.log('disconnect');
});
</script>
.js
在index.html中包含如下一段代码,就是读取.js库,来实时地从html标签内获取数据并展示在网页中。
<script type="text/javascript">
$(document).ready(function () {
Highcharts.setOptions({
global: {
useUTC: false
}
});
Highcharts.chart('container', {
chart: {
type: 'spline',
animation: Highcharts.svg, // 这个在ie浏览器可能不支持
marginRight: 10,
events: {
load: function () {
//设置图表每秒更新一次
var series1 = this.series[0];
var series2 = this.series[1];
setInterval(function () {
var x = (new Date()).getTime();// 获取当前时间
count1 = $('#girl').text();
y = parseInt(count1);
series1.addPoint([x, y], true, true);
count2 = $('#boy').text();
z = parseInt(count2);
series2.addPoint([x, z], true, true);
}, 1000);
}
}
},
title: { //设置图表名
text: '男女生购物人数实时分析'
},
xAxis: { //x轴设置为实时时间
type: 'datetime',
tickPixelInterval: 50
},
yAxis: {
title: {
text: '数量'
},
plotLines: [{ //设置坐标线颜色粗细
value: 0,
width: 1,
color: '#808080'
}]
},
tooltip: {
//规范显示时间的格式
formatter: function () {
return '' + this.series.name + '
' +
Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '
' +
Highcharts.numberFormat(this.y, 2);
}
},
legend: {
enabled: true
},
exporting: {
enabled: true
},
series: [{
name: '女生购物人数',
data: (function () {
// 随机方式生成初始值填充图表
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
},
{
name: '男生购物人数',
data: (function () {
// 随机方式生成初始值填充图表
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}]
});
});
.js
.js这个库文件的功能是实现导入功能。
疗效展示
python app.py
问题与处理问题1spark-env.sh添加代码错误
把Kafka相关jar包的路径信息提高到spark-env.sh,更改后的spark-env.sh类似如下:
原代码:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
!还要留意的是,作者在给出代码的时侯/usr/local//bin/这一段是不用写的,假如直接复制起来会报错,去除后来就好了。
问题2相关包没有安装
pip install gevent-websocket -i https://pypi.tuna.tsinghua.edu.cn/simple pip --trusted-host pypi.tuna.tsi
如上图,按照报错安装好包就行了。
问题3java版本不一致
通过代码
java -version
javac -version
检测
这儿经过检测没有问题。
问题4启动晚报错
究其缘由,是方法三复现出了问题: