网站做5年有多少流量购买虚拟机建网站
- 作者: 五速梦信息网
- 时间: 2026年03月21日 07:17
当前位置: 首页 > news >正文
网站做5年有多少流量,购买虚拟机建网站,网站建设玖金手指排名11,扬州建设网站公司二 根据用户行为数据创建ALS模型并召回商品 2.0 用户行为数据拆分 方便练习可以对数据做拆分处理 pandas的数据分批读取 chunk 厚厚的一块 相当大的数量或部分 import pandas as pd reader pd.read_csv(behavior_log.csv,chunksize100,iteratorTrue) count 0; for chunk in …二 根据用户行为数据创建ALS模型并召回商品 2.0 用户行为数据拆分 方便练习可以对数据做拆分处理 pandas的数据分批读取 chunk 厚厚的一块 相当大的数量或部分 import pandas as pd reader pd.read_csv(behavior_log.csv,chunksize100,iteratorTrue) count 0; for chunk in reader:count 1if count 1:chunk.to_csv(test4.csv,index False)elif count1 and count1000:chunk.to_csv(test4.csv,index False, mode a,header False)else:break pd.read_csv(test4.csv)2.1 预处理behavior_log数据集 创建spark session import os
配置spark driver和pyspark运行时所使用的python解释器路径
PYSPARK_PYTHON /home/hadoop/miniconda3/envs/datapy365spark23/bin/python JAVA_HOME/home/hadoop/app/jdk1.8.0_191
当存在多个版本时不指定很可能会导致出错
os.environ[PYSPARK_PYTHON] PYSPARK_PYTHON os.environ[PYSPARK_DRIVER_PYTHON] PYSPARK_PYTHON os.environ[JAVA_HOME]JAVA_HOME
spark配置信息
from pyspark import SparkConf from pyspark.sql import SparkSessionSPARK_APP_NAME preprocessingBehaviorLog SPARK_URL spark://192.168.199.188:7077conf SparkConf() # 创建spark config对象 config ((spark.app.name, SPARK_APP_NAME), # 设置启动的spark的app名称没有提供将随机产生一个名称(spark.executor.memory, 6g), # 设置该app启动时占用的内存用量默认1g(spark.master, SPARK_URL), # spark master的地址(spark.executor.cores, 4), # 设置spark executor使用的CPU核心数# 以下三项配置可以控制执行器数量
(spark.dynamicAllocation.enabled, True),
(spark.dynamicAllocation.initialExecutors, 1), # 1个执行器
(spark.shuffle.service.enabled, True)
(spark.sql.pivotMaxValues, 99999), # 当需要pivot DF且值很多时需要修改默认是10000
)
查看更详细配置及说明https://spark.apache.org/docs/latest/configuration.htmlconf.setAll(config)# 利用config对象创建spark session
spark SparkSession.builder.config(confconf).getOrCreate()从hdfs中加载csv文件为DataFrame
从hdfs加载CSV文件为DataFrame
df spark.read.csv(hdfs://localhost:9000/datasets/behavior_log.csv, headerTrue) df.show() # 查看dataframe默认显示前20条
大致查看一下数据类型
df.printSchema() # 打印当前dataframe的结构显示结果:
| user|time_stamp|btag| cate| brand|
|558157|1493741625| pv| 6250| 91286| |558157|1493741626| pv| 6250| 91286| |558157|1493741627| pv| 6250| 91286| |728690|1493776998| pv|11800| 62353| |332634|1493809895| pv| 1101|365477| |857237|1493816945| pv| 1043|110616| |619381|1493774638| pv| 385|428950| |467042|1493772641| pv| 8237|301299| |467042|1493772644| pv| 8237|301299| |991528|1493780710| pv| 7270|274795| |991528|1493780712| pv| 7270|274795| |991528|1493780712| pv| 7270|274795| |991528|1493780712| pv| 7270|274795| |991528|1493780714| pv| 7270|274795| |991528|1493780765| pv| 7270|274795| |991528|1493780714| pv| 7270|274795| |991528|1493780765| pv| 7270|274795| |991528|1493780764| pv| 7270|274795| |991528|1493780633| pv| 7270|274795|
|991528|1493780764| pv| 7270|274795|
only showing top 20 rowsroot|– user: string (nullable true)|– time_stamp: string (nullable true)|– btag: string (nullable true)|– cate: string (nullable true)|– brand: string (nullable true)从hdfs加载数据为dataframe并设置结构 from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
构建结构对象
schema StructType([StructField(userId, IntegerType()),StructField(timestamp, LongType()),StructField(btag, StringType()),StructField(cateId, IntegerType()),StructField(brandId, IntegerType()) ])
从hdfs加载数据为dataframe并设置结构
behavior_log_df spark.read.csv(hdfs://localhost:8020/datasets/behavior_log.csv, headerTrue, schemaschema) behavior_log_df.show()
behavior_log_df.count() 显示结果:
|userId| timestamp|btag|cateId|brandId|
|558157|1493741625| pv| 6250| 91286| |558157|1493741626| pv| 6250| 91286| |558157|1493741627| pv| 6250| 91286| |728690|1493776998| pv| 11800| 62353| |332634|1493809895| pv| 1101| 365477| |857237|1493816945| pv| 1043| 110616| |619381|1493774638| pv| 385| 428950| |467042|1493772641| pv| 8237| 301299| |467042|1493772644| pv| 8237| 301299| |991528|1493780710| pv| 7270| 274795| |991528|1493780712| pv| 7270| 274795| |991528|1493780712| pv| 7270| 274795| |991528|1493780712| pv| 7270| 274795| |991528|1493780714| pv| 7270| 274795| |991528|1493780765| pv| 7270| 274795| |991528|1493780714| pv| 7270| 274795| |991528|1493780765| pv| 7270| 274795| |991528|1493780764| pv| 7270| 274795| |991528|1493780633| pv| 7270| 274795|
|991528|1493780764| pv| 7270| 274795|
only showing top 20 rowsroot|– userId: integer (nullable true)|– timestamp: long (nullable true)|– btag: string (nullable true)|– cateId: integer (nullable true)|– brandId: integer (nullable true)分析数据集字段的类型和格式 查看是否有空值查看每列数据的类型查看每列数据的类别情况
print(查看userId的数据情况, behavior_log_df.groupBy(userId).count().count())
约113w用户
#注意behavior_log_df.groupBy(userId).count() 返回的是一个dataframe这里的count计算的是每一个分组的个数但当前还没有进行计算
当调用df.count()时才开始进行计算这里的count计算的是dataframe的条目数也就是共有多少个分组查看user的数据情况 1136340print(查看btag的数据情况, behavior_log_df.groupBy(btag).count().collect()) # collect会把计算结果全部加载到内存谨慎使用
只有四种类型数据pv、fav、cart、buy
这里由于类型只有四个所以直接使用collect把数据全部加载出来查看btag的数据情况 [Row(btagbuy, count9115919), Row(btagfav, count9301837), Row(btagcart, count15946033), Row(btagpv, count688904345)]print(查看cateId的数据情况, behavior_log_df.groupBy(cateId).count().count())
约12968类别id查看cateId的数据情况 12968print(查看brandId的数据情况, behavior_log_df.groupBy(brandId).count().count())
约460561品牌id查看brandId的数据情况 460561print(判断数据是否有空值, behavior_log_df.count(), behavior_log_df.dropna().count())
约7亿条目723268134 723268134
本数据集无空值条目可放心处理判断数据是否有空值 723268134 723268134pivot透视操作把某列里的字段值转换成行并进行聚合运算(pyspark.sql.GroupedData.pivot) 如果透视的字段中的不同属性值超过10000个则需要设置spark.sql.pivotMaxValues否则计算过程中会出现错误。文档介绍。
统计每个用户对各类商品的pv、fav、cart、buy数量
cate_count_df behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.cateId).pivot(btag,[pv,fav,cart,buy]).count() cate_count_df.printSchema() # 此时还没有开始计算显示效果: root|– userId: integer (nullable true)|– cateId: integer (nullable true)|– pv: long (nullable true)|– fav: long (nullable true)|– cart: long (nullable true)|– buy: long (nullable true)统计每个用户对各个品牌的pv、fav、cart、buy数量并保存结果
统计每个用户对各个品牌的pv、fav、cart、buy数量
brand_count_df behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.brandId).pivot(btag,[pv,fav,cart,buy]).count()
brand_count_df.show() # 同上
113w * 46w
由于运算时间比较长所以这里先将结果存储起来供后续其他操作使用
写入数据时才开始计算
cate_count_df.write.csv(hdfs://localhost:9000/preprocessing_dataset/cate_count.csv, headerTrue) brand_count_df.write.csv(hdfs://localhost:9000/preprocessing_dataset/brand_count.csv, headerTrue)2.2 根据用户对类目偏好打分训练ALS模型 根据您统计的次数 打分规则 偏好打分数据集 ALS模型
spark ml的模型训练是基于内存的如果数据过大内存空间小迭代次数过多的化可能会造成内存溢出报错
设置Checkpoint的话会把所有数据落盘这样如果异常退出下次重启后可以接着上次的训练节点继续运行
但该方法其实指标不治本因为无法防止内存溢出所以还是会报错
如果数据量大应考虑的是增加内存、或限制迭代次数和训练数据量级等
spark.sparkContext.setCheckpointDir(hdfs://localhost:8020/checkPoint/) from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType# 构建结构对象 schema StructType([StructField(userId, IntegerType()),StructField(cateId, IntegerType()),StructField(pv, IntegerType()),StructField(fav, IntegerType()),StructField(cart, IntegerType()),StructField(buy, IntegerType()) ])# 从hdfs加载CSV文件 cate_count_df spark.read.csv(hdfs://localhost:9000/preprocessing_dataset/cate_count.csv, headerTrue, schemaschema) cate_count_df.printSchema() cate_count_df.first() # 第一行数据显示结果: root|– userId: integer (nullable true)|– cateId: integer (nullable true)|– pv: integer (nullable true)|– fav: integer (nullable true)|– cart: integer (nullable true)|– buy: integer (nullable true)Row(userId1061650, cateId4520, pv2326, favNone, cart53, buyNone)处理每一行数据r表示row对象 def process_row®:# 处理每一行数据r表示row对象# 偏好评分规则# m: 用户对应的行为次数# 该偏好权重比例次数上限仅供参考具体数值应根据产品业务场景权衡# pv: if m20: score0.2*m; else score4# fav: if m20: score0.4*m; else score8# cart: if m20: score0.6*m; else score12# buy: if m20: score1*m; else score20# 注意这里要全部设为浮点数spark运算时对类型比较敏感要保持数据类型都一致pv_count r.pv if r.pv else 0.0fav_count r.fav if r.fav else 0.0cart_count r.cart if r.cart else 0.0buy_count r.buy if r.buy else 0.0pv_score 0.2*pv_count if pv_count20 else 4.0fav_score 0.4*fav_count if fav_count20 else 8.0cart_score 0.6*cart_count if cart_count20 else 12.0buy_score 1.0*buy_count if buy_count20 else 20.0rating pv_score fav_score cart_score buy_score# 返回用户ID、分类ID、用户对分类的偏好打分return r.userId, r.cateId, rating返回一个PythonRDD类型
返回一个PythonRDD类型此时还没开始计算
cate_count_df.rdd.map(process_row).toDF([userId, cateId, rating])显示结果: DataFrame[userId: bigint, cateId: bigint, rating: double]用户对商品类别的打分数据
用户对商品类别的打分数据
map返回的结果是rdd类型需要调用toDF方法转换为Dataframe
cate_rating_df cate_count_df.rdd.map(process_row).toDF([userId, cateId, rating])
注意toDF不是每个rdd都有的方法仅局限于此处的rdd
可通过该方法获得 user-cate-matrix
但由于cateId字段过多这里运算量比很大机器内存要求很高才能执行否则无法完成任务
请谨慎使用# 但好在我们训练ALS模型时不需要转换为user-cate-matrix所以这里可以不用运行
cate_rating_df.groupBy(userId).povit(cateId).min(rating)
用户对类别的偏好打分数据
cate_rating_df显示结果: DataFrame[userId: bigint, cateId: bigint, rating: double]通常如果USER-ITEM打分数据应该是通过一下方式进行处理转换为USER-ITEM-MATRIX [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4dF4oXj6-1691649134814)(/img/CF%E4%BB%8B%E7%BB%8D.png)] 但这里我们将使用的Spark的ALS模型进行CF推荐因此注意这里数据输入不需要提前转换为矩阵直接是 USER-ITEM-RATE的数据 基于Spark的ALS隐因子模型进行CF评分预测 ALS的意思是交替最小二乘法Alternating Least Squares是Spark2.*中加入的进行基于模型的协同过滤model-based CF的推荐系统算法。 同SVD它也是一种矩阵分解技术对数据进行降维处理。 详细使用方法pyspark.ml.recommendation.ALS 注意由于数据量巨大因此这里也不考虑基于内存的CF算法 参考为什么Spark中只有ALS
使用pyspark中的ALS矩阵分解方法实现CF评分预测
文档地址https://spark.apache.org/docs/2.2.2/api/python/pyspark.ml.html?highlightvectors#module-pyspark.ml.recommendation
from pyspark.ml.recommendation import ALS # mldataframe mllibrdd# 利用打分数据训练ALS模型 als ALS(userColuserId, itemColcateId, ratingColrating, checkpointInterval5)# 此处训练时间较长 model als.fit(cate_rating_df)模型训练好后调用方法进行使用具体API查看
model.recommendForAllUsers(N) 给所有用户推荐TOP-N个物品
ret model.recommendForAllUsers(3)
由于是给所有用户进行推荐此处运算时间也较长
ret.show()
推荐结果存放在recommendations列中
ret.select(recommendations).show()显示结果:
|userId| recommendations|
| 148|[[3347, 12.547271…| | 463|[[1610, 9.250818]…| | 471|[[1610, 10.246621…| | 496|[[1610, 5.162216]…| | 833|[[5607, 9.065482]…| | 1088|[[104, 6.886987],…| | 1238|[[5631, 14.51981]…| | 1342|[[5720, 10.89842]…| | 1580|[[5731, 8.466453]…| | 1591|[[1610, 12.835257…| | 1645|[[1610, 11.968531…| | 1829|[[1610, 17.576496…| | 1959|[[1610, 8.353473]…| | 2122|[[1610, 12.652732…| | 2142|[[1610, 12.48068]…| | 2366|[[1610, 11.904813…| | 2659|[[5607, 11.699315…| | 2866|[[1610, 7.752719]…| | 3175|[[3347, 2.3429515…|
| 3749|[[1610, 3.641833]…|
only showing top 20 rows——————–
| recommendations |
|---|
| [[3347, 12.547271… |
| [[1610, 9.250818]… |
| [[1610, 10.246621… |
| [[1610, 5.162216]… |
| [[5607, 9.065482]… |
| [[104, 6.886987],… |
| [[5631, 14.51981]… |
| [[5720, 10.89842]… |
| [[5731, 8.466453]… |
| [[1610, 12.835257… |
| [[1610, 11.968531… |
| [[1610, 17.576496… |
| [[1610, 8.353473]… |
| [[1610, 12.652732… |
| [[1610, 12.48068]… |
| [[1610, 11.904813… |
| [[5607, 11.699315… |
| [[1610, 7.752719]… |
| [[3347, 2.3429515… |
| [[1610, 3.641833]… |
only showing top 20 rowsmodel.recommendForUserSubset 给部分用户推荐TOP-N个物品
注意recommendForUserSubset API2.2.2版本中无法使用
dataset spark.createDataFrame([[1],[2],[3]]) dataset dataset.withColumnRenamed(_1, userId) ret model.recommendForUserSubset(dataset, 3)# 只给部分用推荐运算时间短 ret.show()
ret.collect() # 注意 collect会将所有数据加载到内存慎用显示结果:
|userId| recommendations|
| 1|[[1610, 25.4989],…| | 3|[[5607, 13.665942…| | 2|[[5579, 5.9051886…| ————————–[Row(userId1, recommendations[Row(cateId1610, rating25.498899459838867), Row(cateId5737, rating24.901548385620117), Row(cateId3347, rating20.736785888671875)]),Row(userId3, recommendations[Row(cateId5607, rating13.665942192077637), Row(cateId1610, rating11.770171165466309), Row(cateId3347, rating10.35690689086914)]),Row(userId2, recommendations[Row(cateId5579, rating5.90518856048584), Row(cateId2447, rating5.624575138092041), Row(cateId5690, rating5.2555742263793945)])]transform中提供userId和cateId可以对打分进行预测利用打分结果排序后
transform中提供userId和cateId可以对打分进行预测利用打分结果排序后同样可以实现TOP-N的推荐
model.transform
将模型进行存储
model.save(hdfs://localhost:8020/models/userCateRatingALSModel.obj)
测试存储的模型
from pyspark.ml.recommendation import ALSModel
从hdfs加载之前存储的模型
als_model ALSModel.load(hdfs://localhost:8020/models/userCateRatingALSModel.obj)
model.recommendForAllUsers(N) 给用户推荐TOP-N个物品
result als_model.recommendForAllUsers(3)
result.show()显示结果:
|userId| recommendations|
| 148|[[3347, 12.547271…| | 463|[[1610, 9.250818]…| | 471|[[1610, 10.246621…| | 496|[[1610, 5.162216]…| | 833|[[5607, 9.065482]…| | 1088|[[104, 6.886987],…| | 1238|[[5631, 14.51981]…| | 1342|[[5720, 10.89842]…| | 1580|[[5731, 8.466453]…| | 1591|[[1610, 12.835257…| | 1645|[[1610, 11.968531…| | 1829|[[1610, 17.576496…| | 1959|[[1610, 8.353473]…| | 2122|[[1610, 12.652732…| | 2142|[[1610, 12.48068]…| | 2366|[[1610, 11.904813…| | 2659|[[5607, 11.699315…| | 2866|[[1610, 7.752719]…| | 3175|[[3347, 2.3429515…|
| 3749|[[1610, 3.641833]…|
only showing top 20 rows召回到redis import redis host 192.168.19.8 port 6379
召回到redis
def recall_cate_by_cf(partition):# 建立redis 连接池pool redis.ConnectionPool(hosthost, portport)# 建立redis客户端client redis.Redis(connection_poolpool)for row in partition:client.hset(recall_cate, row.userId, [i.cateId for i in row.recommendations])
对每个分片的数据进行处理 #mapPartition Transformation map
foreachPartition Action操作 foreachRDD
result.foreachPartition(recall_cate_by_cf)# 注意这里这是召回的是用户最感兴趣的n个类别
总的条目数查看redis中总的条目数是否一致
result.count()显示结果: 11363402.3 根据用户对品牌偏好打分训练ALS模型 from pyspark.sql.types import StructType, StructField, StringType, IntegerTypeschema StructType([StructField(userId, IntegerType()),StructField(brandId, IntegerType()),StructField(pv, IntegerType()),StructField(fav, IntegerType()),StructField(cart, IntegerType()),StructField(buy, IntegerType()) ])
从hdfs加载预处理好的品牌的统计数据
brand_count_df spark.read.csv(hdfs://localhost:8020/preprocessing_dataset/brand_count.csv, headerTrue, schemaschema)
brand_count_df.show()
def process_row®:# 处理每一行数据r表示row对象# 偏好评分规则# m: 用户对应的行为次数# 该偏好权重比例次数上限仅供参考具体数值应根据产品业务场景权衡# pv: if m20: score0.2*m; else score4# fav: if m20: score0.4*m; else score8# cart: if m20: score0.6*m; else score12# buy: if m20: score1*m; else score20# 注意这里要全部设为浮点数spark运算时对类型比较敏感要保持数据类型都一致pv_count r.pv if r.pv else 0.0fav_count r.fav if r.fav else 0.0cart_count r.cart if r.cart else 0.0buy_count r.buy if r.buy else 0.0pv_score 0.2*pv_count if pv_count20 else 4.0fav_score 0.4*fav_count if fav_count20 else 8.0cart_score 0.6*cart_count if cart_count20 else 12.0buy_score 1.0*buy_count if buy_count20 else 20.0rating pv_score fav_score cart_score buy_score# 返回用户ID、品牌ID、用户对品牌的偏好打分return r.userId, r.brandId, rating
用户对品牌的打分数据
brand_rating_df brand_count_df.rdd.map(process_row).toDF([userId, brandId, rating])
brand_rating_df.show()基于Spark的ALS隐因子模型进行CF评分预测 ALS的意思是交替最小二乘法Alternating Least Squares是Spark中进行基于模型的协同过滤model-based CF的推荐系统算法也是目前Spark内唯一一个推荐算法。 同SVD它也是一种矩阵分解技术但理论上ALS在海量数据的处理上要优于SVD。 更多了解pyspark.ml.recommendation.ALS 注意由于数据量巨大因此这里不考虑基于内存的CF算法 参考为什么Spark中只有ALS 使用pyspark中的ALS矩阵分解方法实现CF评分预测
使用pyspark中的ALS矩阵分解方法实现CF评分预测
文档地址https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlightvectors#module-pyspark.ml.recommendation
from pyspark.ml.recommendation import ALSals ALS(userColuserId, itemColbrandId, ratingColrating, checkpointInterval2)
利用打分数据训练ALS模型
此处训练时间较长
model als.fit(brand_rating_df)
model.recommendForAllUsers(N) 给用户推荐TOP-N个物品
model.recommendForAllUsers(3).show()
将模型进行存储
model.save(hdfs://localhost:9000/models/userBrandRatingModel.obj)
测试存储的模型
from pyspark.ml.recommendation import ALSModel
从hdfs加载模型
my_model ALSModel.load(hdfs://localhost:9000/models/userBrandRatingModel.obj) my_model
model.recommendForAllUsers(N) 给用户推荐TOP-N个物品
my_model.recommendForAllUsers(3).first()
- 上一篇: 网站做3年3年包括什么软件吗100款应用软件安装入口
- 下一篇: 网站做301对优化有影响经营网站备案
相关文章
-
网站做3年3年包括什么软件吗100款应用软件安装入口
网站做3年3年包括什么软件吗100款应用软件安装入口
- 技术栈
- 2026年03月21日
-
网站作品集南京企业官网建设
网站作品集南京企业官网建设
- 技术栈
- 2026年03月21日
-
网站左侧悬浮导航好看的网站排版
网站左侧悬浮导航好看的网站排版
- 技术栈
- 2026年03月21日
-
网站做301对优化有影响经营网站备案
网站做301对优化有影响经营网站备案
- 技术栈
- 2026年03月21日
-
网站做301需要备案吗久久建筑网是山东省的吗
网站做301需要备案吗久久建筑网是山东省的吗
- 技术栈
- 2026年03月21日
-
网站做app的好处html5移动端网站开发教程
网站做app的好处html5移动端网站开发教程
- 技术栈
- 2026年03月21日
