怎么回事呀?(乳頭硬疼是怎么回事呀)
1004
2025-04-02
服務(wù)器運(yùn)行環(huán)境:Spark 2.4.4 + scall?2.11.12 + Kafka 2.2.2
由于業(yè)務(wù)相對簡單,Kafka只有固定topics,所以一直使用下面腳本執(zhí)行實(shí)時流計算
Spark-submit?--packages?org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4?--py-files?/data/service/xxx.zip?/data/service/xxx.py
代碼中使用pyspark.streaming.kafka的KafkaUtils來創(chuàng)建spark?streaming與kafka的連接,運(yùn)行了好長時間都沒有出現(xiàn)過問題
隨著新業(yè)務(wù)接入,在新功能中kafka需要使用動態(tài)topics方式,要用到正則表達(dá)式,查了KafkaUtils源碼和相關(guān)資料,發(fā)現(xiàn)它不支持動態(tài)topics方式,需要使用spark-streaming-kafka-0-10才能支持
查看文檔http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html?與?http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html?后,使用結(jié)構(gòu)化流structured-streaming來實(shí)現(xiàn)
實(shí)現(xiàn)代碼:
import?sysfrom?pyspark.sql?import?SparkSessiondef?process_row(row):????#?Write?row?to?storage ????passif?__name__?==?"__main__":????if?len(sys.argv)?!=?4:????????print(""" ????????Usage:?structured_kafka_wordcount.py?
執(zhí)行提交任務(wù)命令
spark-submit?--packages?org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4?/data/service/demo.py?master:9092?subscribePattern?event.log.*
提交后一直報下面錯誤
org.apache.kafka.common.config.ConfigException:? Missing?required?configuration?"partition.assignment.strategy"?which?has?no?default?value
查了好多資料,都說需要添加參數(shù),配置Kafka分區(qū)分配策略,并將readStream修改為:
ds?=?spark\ ????????.readStream\ ????????.format("kafka")\ ????????.option("kafka.bootstrap.servers",?bootstrapServers)\ ????????.option("kafka.partition.assignment.strategy",?"range")\ ????????.option(subscribeType,?topics)\ ????????.load()
再次運(yùn)行異常信息改為無法連接kafka了,弄了整整一天人都快崩潰了還沒搞定
還好最終查找https://xbuba.com/questions/44959483,大牛提示說有可能是kafka0.8版本的jar與kafka0.10的jar沖突原因造成的
使用命令查找
find?/?-name?'spark-streaming-kafka*' find?/?-name?'spark-sql-kafka*'
發(fā)現(xiàn)在/root/.ivy2/cache/org.apache.spark/?目錄下面存在spark-streaming-kafka-0-8_2.11?與?spark-sql-kafka-0-10_2.11?文件夾和相關(guān)的jar文件
將spark-streaming-kafka-0-8_2.11刪除后執(zhí)行代碼就正常運(yùn)行了
由于老腳本用的還是kafka0.8,為了兼容兩個版本能同時運(yùn)行,需要將/root/.ivy2/cache/org.apache.spark/?目錄下面kafka0.8與kafka0.10兩個版本的jar全部清除
然后登錄https://repo1.maven.org/maven2/org/apache/spark/?下載spark-streaming-kafka-0-8與spark-sql-kafka-0-10對應(yīng)的jar下來,并將提交命令spark-submit的參數(shù)改為:
spark-submit?--jars?/data/service/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar?--py-files?/data/service/xxx.zip?/data/service/xxx.py spark-submit?--jars?/data/service/spark-sql-kafka-0-10_2.11-2.4.4.jar?/data/service/demo.py?master:9092?subscribePattern?event.log.*
修改后兩個腳本運(yùn)行都沒有問題(PS:老腳本原想直接用org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4包來啟動,執(zhí)行后直接暴錯,提示說要改為org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4才行)
Spark
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。