【王喆-推薦系統】特征工程篇-(task2)用Spark進行特征處理
學習心得

(1)本次task學習了推薦系統中特征處理的主要方式,并利用 Spark 實踐了類別型特征和數值型特征的主要處理方法,深度學習和傳統機器學習的區別并不大,TensorFlow、PyTorch 等深度學習平臺也提供了類似的特征處理函數。
(2)其中幾個特征處理API:
Normalizer,是范式歸一化操作,保證歸一化之后范式為1
StandardScaler,是標準差歸一化操作,保證歸一化之后均值為0標準差為1
RobustScaler,是使用分位數進行魯棒歸一化操作,可以有效減少異常值的干擾
MinMaxScaler,是使用最大值和最小值進行歸一化操作。
(3)Spark 的計算過程:Stage 內部數據高效并行計算,Stage 邊界處進行消耗資源的 shuffle 操作或者最終的 reduce 操作。
注意:OneHotEncoderEstimator()在PySpark 3.0.0及以上版本已經更改為 OneHotEncoder()。
文章目錄
學習心得
零、背景引入
一、業界主流的大數據處理利器:Spark
1.1 Spark原理
1.2 一個具體栗子
二、如何利用 One-hot 編碼處理類別型特征
2.1 One-hot編碼
2.2 Sparrow系統栗子
2.3 Multiple編碼
三、數值型特征的處理——歸一化和分桶
3.1 解決特征的尺度相差過大
3.2 解決特征分布不均勻問題
3.3 YouTube的數值型特征處理
四、作業
五、答疑
Reference
零、背景引入
上次task學習了推薦系統要使用的常用特征——基本分為“用戶行為”、“用戶關系”、“屬性標簽”、“內容數據”、“場景信息”這五個類別。但這些原始的特征是無法直接提供給推薦模型使用的,因為
推薦模型本質上是一個函數,輸入輸出都是數字或數值型的向量
。像動作、喜劇、愛情、科幻這些電影風格,是怎么轉換成數值供推薦模型使用的呢?用戶的行為歷史又是怎么轉換成數值特征的呢?
類似的特征處理過程在數據量變大之后還會變得更加復雜,因為工業界的數據集往往都是 TB 甚至 PB 規模的,這在單機上肯定是沒法處理的。那業界又是怎樣進行海量數據的特征處理呢?
一、業界主流的大數據處理利器:Spark
1.1 Spark原理
Spark是業界主流的大數據處理利器。
分布式:指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。
Spark 是一個分布式計算平臺。Spark 最典型的應用方式就是建立在大量廉價的計算節點上,這些節點可以是廉價主機,也可以是虛擬的 Docker Container(Docker 容器)。
Spark 的架構圖中:
Spark 程序由 Manager Node(管理節點)進行調度組織
由 Worker Node(工作節點)進行具體的計算任務執行
最終將結果返回給 Drive Program(驅動程序)。
在物理的 Worker Node 上,數據還會分為不同的 partition(數據分片),可以說 partition 是 Spark 的基礎數據單元。
Spark 計算集群能夠比傳統的單機高性能服務器具備更強大的計算能力,就是由這些成百上千,甚至達到萬以上規模的工作節點并行工作帶來的。
1.2 一個具體栗子
那在執行一個具體任務的時候,Spark 是怎么協同這么多的工作節點,通過并行計算得出最終的結果呢?這里我們用一個任務來解釋一下 Spark 的工作過程。
一個具體任務過程:
(1)先從本地硬盤讀取文件 textFile;
(2)再從分布式文件系統 HDFS 讀取文件 hadoopFile;
(3)然后分別對它們進行處理;
(4)再把兩個文件按照 ID 都 join 起來得到最終的結果。
在 Spark 平臺上處理這個任務的時候,會將這個任務拆解成一個子任務 DAG(Directed Acyclic Graph,有向無環圖),再根據 DAG 決定程序各步驟執行的方法。從圖 2 中可以看到,這個 Spark 程序分別從 textFile 和 hadoopFile 讀取文件,再經過一系列 map、filter 等操作后進行 join,最終得到了處理結果。
最關鍵的過程是要理解
哪些是可以純并行處理的部分,哪些是必須 shuffle(混洗)和 reduce 的部分
:這里的 shuffle 指的是所有 partition 的數據必須進行洗牌后才能得到下一步的數據,最典型的操作就是圖 2 中的 groupByKey 操作和 join 操作。以 join 操作為例,必須對 textFile 數據和 hadoopFile 數據做全量的匹配才可以得到 join 后的 dataframe(Spark 保存數據的結構)。而 groupByKey 操作則需要對數據中所有相同的 key 進行合并,也需要全局的 shuffle 才能完成。
與之相比,map、filter 等操作僅需要逐條地進行數據處理和轉換,不需要進行數據間的操作,因此各 partition 之間可以完全并行處理。
在得到最終的計算結果之前,程序需要進行 reduce 的操作,從各 partition 上匯總統計結果,隨著 partition 的數量逐漸減小,reduce 操作的并行程度逐漸降低,直到將最終的計算結果匯總到 master 節點(主節點)上。可以說,shuffle 和 reduce 操作的觸發決定了純并行處理階段的邊界。
注意:
(1)
shuffle 操作需要在不同計算節點之間進行數據交換,非常消耗計算、通信及存儲資源
,因此 shuffle 操作是 spark 程序應該盡量避免的。
shuffle可以理解為一個串行操作,需要等到在此之前的并行工作完成之后才可以順序開始。
(2)簡述Spark 的計算過程:Stage 內部數據高效并行計算,Stage 邊界處進行消耗資源的 shuffle 操作或者最終的 reduce 操作。
下面將應用Spark在推薦系統的特征處理上,用 Spark 處理我們的 Sparrow Recsys 項目的數據集。帶著2個問題學習: 經典的特征處理方法有什么?Spark 是如何實現這些特征處理方法的?
二、如何利用 One-hot 編碼處理類別型特征
2.1 One-hot編碼
廣義上來講,所有的特征都可以分為兩大類:
(1)第一類是類別、ID 型特征(以下簡稱類別型特征)。
拿電影推薦來說,電影的風格、ID、標簽、導演演員等信息,用戶看過的電影 ID、用戶的性別、地理位置信息、當前的季節、時間(上午,下午,晚上)、天氣等等,這些無法用數字表示的信息全都可以被看作是類別、ID 類特征。
(2)第二類是數值型特征,能用數字直接表示的特征就是數值型特征。
典型的包括用戶的年齡、收入、電影的播放時長、點擊量、點擊率等。
特征處理的目的:把所有的特征全部轉換成一個數值型的特征向量,對于數值型特征,這個過程非常簡單,直接把這個數值放到特征向量上相應的維度上就可以了。但是對于類別、ID 類特征,怎么處理它們呢?
這里就要用到 One-hot 編碼(也被稱為獨熱編碼),它是
將類別、ID 型特征轉換成數值向量的一種最典型的編碼方式
。它通過把所有其他維度置為 0,單獨將當前類別或者 ID 對應的維度置為 1 的方式生成特征向量。
One-hot編碼舉栗:假設某樣本有三個特征,分別是星期、性別和城市,我們用 [Weekday=Tuesday, Gender=Male, City=London] 來表示,用 One-hot 編碼對其進行數值化的結果。
除了上面栗子的類別型特征外,ID 型特征也經常使用 One-hot 編碼。
比如,在 SparrowRecsys 中,用戶 U 觀看過電影 M,這個行為是一個非常重要的用戶特征,如何向量化這個行為呢?其實也是使用 One-hot 編碼。假設,我們的電影庫中一共有 1000 部電影,電影 M 的 ID 是 310(編號從 0 開始),那這個行為就可以用一個 1000 維的向量來表示,讓第 310 維的元素為 1,其他元素都為 0。
2.2 Sparrow系統栗子
下面看看 SparrowRecsys 是如何利用 Spark 完成這一過程的。這里使用 Spark 的機器學習庫 MLlib 來完成 One-hot 特征的處理。
其中,最主要的步驟是,先創建一個負責 One-hot 編碼的轉換器,OneHotEncoderEstimator,然后通過它的 fit 函數完成指定特征的預處理,并利用 transform 函數將原始特征轉換成 One-hot 特征。實現思路大體上就是這樣,具體的步驟可以參考下面給出的源碼:
def oneHotEncoderExample(samples:DataFrame): Unit ={ //samples樣本集中的每一條數據代表一部電影的信息,其中movieId為電影id val samplesWithIdNumber = samples.withColumn("movieIdNumber", col("movieId").cast(sql.types.IntegerType)) //利用Spark的機器學習庫Spark MLlib創建One-hot編碼器 val oneHotEncoder = new OneHotEncoderEstimator() .setInputCols(Array("movieIdNumber")) .setOutputCols(Array("movieIdVector")) .setDropLast(false) //訓練One-hot編碼器,并完成從id特征到One-hot向量的轉換 val oneHotEncoderSamples = oneHotEncoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber) //打印最終樣本的數據結構 oneHotEncoderSamples.printSchema() //打印10條樣本查看結果 oneHotEncoderSamples.show(10) _(參考 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngineering__中的oneHotEncoderExample函數)_
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
One-hot 編碼也可以自然衍生成 Multi-hot 編碼(多熱編碼)。比如,對于歷史行為序列類、標簽特征等數據來說,用戶往往會與多個物品產生交互行為,或者
一個物品被打上多個標簽,這時最常用的特征向量生成方式就是把其轉換成 Multi-hot 編碼
。
在 SparrowRecsys 中,因為每個電影都是有多個 Genre(風格)類別的,所以我們就可以用 Multi-hot 編碼完成標簽到向量的轉換。
可以嘗試著用 Spark 實現該過程,也可以參考 SparrowRecsys 項目中 multiHotEncoderExample 的實現。
2.3 Multiple編碼
Multiple編碼特征將多個屬性同時編碼到一個特征中。在推薦場景中,單個用戶對哪些物品感興趣的特征就是一種Multiple編碼特征,如,表示某用戶對產品1、產品2、產品3、產品4是否感興趣,則這個特征可能有多個取值,如用戶A對產品1和產品2感興趣,用戶B對產品1和產品4感興趣,用戶C對產品1、產品3和產品4感興趣,則用戶興趣特征為
用戶 UserInterests A [1, 2] B [1, 4] C [1, 3, 4]
1
2
3
4
Multiple編碼采用類似oneHot編碼的形式進行編碼,根據物品種類數目,展成物品種類數目大小的向量,當某個用戶感興趣時,對應維度為1,反之為0,如下
用戶 UserInterests A [1, 1, 0, 0] B [1, 0, 0, 1] C [1, 0, 1, 1]
1
2
3
4
如何使用Multiple編碼呢?
我們將多個屬性同時編碼到同一個特征中,目的就是同時利用多個屬性的特征。經過Multiple編碼后的特征大小為[batch_size, num_items],記作U,構建物品items的Embedding矩陣,該矩陣維度為[num_items, embedding_size],記作V,將矩陣U和矩陣V相乘,我們就得到了大小為[batch_size, embedding_size]的多屬性表示。
三、數值型特征的處理——歸一化和分桶
為啥處理特征:一是特征的尺度,二是特征的分布。
3.1 解決特征的尺度相差過大
前者即防止特征尺度之間相距過大,比如在電影推薦中有兩個特征,一個是電影的評價次數 fr(無上限),一個是電影的平均評分 fs。fr波動范圍高于fs幾個數量級,可能會完全掩蓋fs作用,所以將兩個特征尺度拉平到一個區域內(通常是[0, 1],即所謂的歸一化)。
3.2 解決特征分布不均勻問題
歸一化雖然能夠解決特征取值范圍不統一的問題,但無法改變特征值的分布。比如圖 5 就顯示了 Sparrow Recsys 中編號在前 1000 的電影平均評分分布。由于人們打分有“中庸偏上”的傾向,因此評分大量集中在 3.5 的附近,而且越靠近 3.5 的密度越大。這對于模型學習來說也不是一個好的現象,因為
特征的區分度并不高
。
經常會用分桶的方式來解決特征值分布極不均勻的問題。
分桶(Bucketing)
:將樣本按照某特征的值從高到低排序,然后按照桶的數量找到分位數,將樣本分到各自的桶中,再用桶 ID 作為特征值。
在 Spark MLlib 中,分別提供了兩個轉換器 MinMaxScaler 和 QuantileDiscretizer,來進行歸一化和分桶的特征處理。它們的使用方法和之前介紹的 OneHotEncoderEstimator 一樣,都是先用 fit 函數進行數據預處理,再用 transform 函數完成特征轉換。下面的代碼就是 SparrowRecSys 利用這兩個轉換器完成特征歸一化和分桶的過程。
def ratingFeatures(samples:DataFrame): Unit ={ samples.printSchema() samples.show(10) //利用打分表ratings計算電影的平均分、被打分次數等數值型特征 val movieFeatures = samples.groupBy(col("movieId")) .agg(count(lit(1)).as("ratingCount"), avg(col("rating")).as("avgRating"), variance(col("rating")).as("ratingVar")) .withColumn("avgRatingVec", double2vec(col("avgRating"))) movieFeatures.show(10) //分桶處理,創建QuantileDiscretizer進行分桶,將打分次數這一特征分到100個桶中 val ratingCountDiscretizer = new QuantileDiscretizer() .setInputCol("ratingCount") .setOutputCol("ratingCountBucket") .setNumBuckets(100) //歸一化處理,創建MinMaxScaler進行歸一化,將平均得分進行歸一化 val ratingScaler = new MinMaxScaler() .setInputCol("avgRatingVec") .setOutputCol("scaleAvgRating") //創建一個pipeline,依次執行兩個特征處理過程 val pipelineStage: Array[PipelineStage] = Array(ratingCountDiscretizer, ratingScaler) val featurePipeline = new Pipeline().setStages(pipelineStage) val movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures) //打印最終結果 movieProcessedFeatures.show( _(參考 com.wzhe.sparrowrecsys.offline.spark.featureeng.FeatureEngineering中的ratingFeatures函數)_
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
3.3 YouTube的數值型特征處理
在經典的 YouTube 深度推薦模型中,可以看到一些很有意思的處理方法。比如,在處理觀看時間間隔(time since last watch)和視頻曝光量(previous impressions)這兩個特征時,YouTube 模型對它們進行歸一化后,又將它們各自處理成了三個特征(圖 6 中紅框內的部分),分別是原特征值 x,特征值的平方x^2,以及特征值的開方,這又是為什么呢?
無論是平方還是開方操作,改變的還是這個特征值的分布,這些操作與分桶操作一樣,都是希望
通過改變特征的分布,讓模型能夠更好地學習到特征內包含的有價值信息
。但由于我們沒法通過人工的經驗判斷哪種特征處理方式更好,所以索性把它們都輸入模型,讓模型來做選擇。
四、作業
(1)請你查閱一下 Spark MLlib 的編程手冊,找出 Normalizer、StandardScaler、RobustScaler、MinMaxScaler 這個幾個特征處理方法有什么不同。
Normalizer、StandardScaler、RobustScaler、MinMaxScaler 都是用讓數據無量綱化
Normalizer: 正則化;(和Python的sklearn一樣是按行處理,而不是按列[每一列是一個特征]處理,原因是:Normalization主要思想是對每個樣本計算其p-范數,然后對該樣本中每個元素除以該范數,這樣處理的結果是使得每個處理后樣本的p-范數(l1-norm,l2-norm)等于1。)針對每行樣本向量:l1: 每個元素/樣本中每個元素絕對值的和,l2: 每個元素/樣本中每個元素的平方和開根號,lp: 每個元素/每個元素的p次方和的p次根,默認用l2范數。
StandardScaler:數據標準化; ( x i ? u ) / σ (xi - u) / σ (xi?u)/σ 【u:均值,σ:方差】當數據(x)按均值(μ)中心化后,再按標準差(σ)縮放,數據就會服從為均值為0,方差為1的正態分布(即標準正態分布)。
RobustScaler: ( x i ? m e d i a n ) / I Q R (xi - median) / IQR (xi?median)/IQR 【median是樣本的中位數,IQR是樣本的 四分位距:根據第1個四分位數和第3個四分位數之間的范圍來縮放數據】
MinMaxScaler:數據歸一化, ( x i ? m i n ( x ) ) / ( m a x ( x ) ? m i n ( x ) ) (xi - min(x)) / (max(x) - min(x)) (xi?min(x))/(max(x)?min(x)) ;當數據(x)按照最小值中心化后,再按極差(最大值 - 最小值)縮放,數據移動了最小值個單位,并且會被收斂到 [0,1]之間
(2)你能試著運行一下 SparrowRecSys 中的 FeatureEngineering 類,從輸出的結果中找出,到底哪一列是我們處理好的 One-hot 特征和 Multi-hot 特征嗎?以及這兩個特征是用 Spark 中的什么數據結構來表示的呢?
答:One-hot特征是調用OneHotEncoderEstimator對movieId轉換,生成了特征movieIdVector;
Multi-hot 特征是調用Vectors.sparse方法,對處理后的genreIndexes轉換,生成vector。
這倆個特征都是稀疏向量表示(數據結構:SparseVector),不是稠密向量。
其中的數據分別是:(類別數量,索引數組,值數組)。索引數組長度必須等于值數組長度。
五、答疑
(1)對訓練數據進行平方或者開方,是為了改變訓練數據的分布。訓練數據的分布被改變后,訓練出來的模型豈不是不能正確擬合訓練數據了。
答:本質上是改變了特征的分布,特征的分布和訓練數據的分布沒有本質的聯系。只要你不改變訓練數據label的分布,最終預測出的結果都應該是符合數據本身分布的。因為你要預測的是label,并不是特征本身。
Reference
(1)《深度學習推薦系統實戰》,王喆
(2)王喆大佬的github:https://github.com/wzhe06
(3)Machine Learning Library (MLlib)
(4)https://www.codeleading.com/article/97252516619/#_OneHot_19
(5)http://spark.apache.org/docs/3.0.0/api/python/index.html
spark 推薦系統
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。