spark streaming連接kafka引發(fā)"partition.assignment.strategy"異常處理

      網(wǎng)友投稿 1004 2025-04-02

      服務(wù)器運(yùn)行環(huán)境:Spark 2.4.4 + scall?2.11.12 + Kafka 2.2.2


      由于業(yè)務(wù)相對簡單,Kafka只有固定topics,所以一直使用下面腳本執(zhí)行實(shí)時流計算

      Spark-submit?--packages?org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4?--py-files?/data/service/xxx.zip?/data/service/xxx.py

      代碼中使用pyspark.streaming.kafka的KafkaUtils來創(chuàng)建spark?streaming與kafka的連接,運(yùn)行了好長時間都沒有出現(xiàn)過問題

      spark streaming連接kafka引發(fā)"partition.assignment.strategy"異常處理

      隨著新業(yè)務(wù)接入,在新功能中kafka需要使用動態(tài)topics方式,要用到正則表達(dá)式,查了KafkaUtils源碼和相關(guān)資料,發(fā)現(xiàn)它不支持動態(tài)topics方式,需要使用spark-streaming-kafka-0-10才能支持

      查看文檔http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html?與?http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html?后,使用結(jié)構(gòu)化流structured-streaming來實(shí)現(xiàn)

      實(shí)現(xiàn)代碼:

      import?sysfrom?pyspark.sql?import?SparkSessiondef?process_row(row):????#?Write?row?to?storage ????passif?__name__?==?"__main__":????if?len(sys.argv)?!=?4:????????print(""" ????????Usage:?structured_kafka_wordcount.py???????????""",?file=sys.stderr) ????????sys.exit(-1) ????bootstrapServers?=?sys.argv[1] ????subscribeType?=?sys.argv[2] ????topics?=?sys.argv[3] ????spark?=?SparkSession\ ????????.builder\ ????????.appName("StructuredKafkaWordCount")\ ????????.getOrCreate()????#?Create?DataSet?representing?the?stream?of?input?lines?from?kafka ????ds?=?spark\ ????????.readStream\ ????????.format("kafka")\ ????????.option("kafka.bootstrap.servers",?bootstrapServers)\ ????????.option(subscribeType,?topics)\ ????????.load()\ ????????.selectExpr("CAST(value?AS?STRING)") ????ds.printSchema() ????query?=?ds.writeStream.foreach(process_row).start() ????query.awaitTermination()

      執(zhí)行提交任務(wù)命令

      spark-submit?--packages?org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4?/data/service/demo.py?master:9092?subscribePattern?event.log.*

      提交后一直報下面錯誤

      org.apache.kafka.common.config.ConfigException:? Missing?required?configuration?"partition.assignment.strategy"?which?has?no?default?value

      查了好多資料,都說需要添加參數(shù),配置Kafka分區(qū)分配策略,并將readStream修改為:

      ds?=?spark\ ????????.readStream\ ????????.format("kafka")\ ????????.option("kafka.bootstrap.servers",?bootstrapServers)\ ????????.option("kafka.partition.assignment.strategy",?"range")\ ????????.option(subscribeType,?topics)\ ????????.load()

      再次運(yùn)行異常信息改為無法連接kafka了,弄了整整一天人都快崩潰了還沒搞定

      還好最終查找https://xbuba.com/questions/44959483,大牛提示說有可能是kafka0.8版本的jar與kafka0.10的jar沖突原因造成的

      使用命令查找

      find?/?-name?'spark-streaming-kafka*' find?/?-name?'spark-sql-kafka*'

      發(fā)現(xiàn)在/root/.ivy2/cache/org.apache.spark/?目錄下面存在spark-streaming-kafka-0-8_2.11?與?spark-sql-kafka-0-10_2.11?文件夾和相關(guān)的jar文件

      將spark-streaming-kafka-0-8_2.11刪除后執(zhí)行代碼就正常運(yùn)行了

      由于老腳本用的還是kafka0.8,為了兼容兩個版本能同時運(yùn)行,需要將/root/.ivy2/cache/org.apache.spark/?目錄下面kafka0.8與kafka0.10兩個版本的jar全部清除

      然后登錄https://repo1.maven.org/maven2/org/apache/spark/?下載spark-streaming-kafka-0-8與spark-sql-kafka-0-10對應(yīng)的jar下來,并將提交命令spark-submit的參數(shù)改為:

      spark-submit?--jars?/data/service/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar?--py-files?/data/service/xxx.zip?/data/service/xxx.py spark-submit?--jars?/data/service/spark-sql-kafka-0-10_2.11-2.4.4.jar?/data/service/demo.py?master:9092?subscribePattern?event.log.*

      修改后兩個腳本運(yùn)行都沒有問題(PS:老腳本原想直接用org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4包來啟動,執(zhí)行后直接暴錯,提示說要改為org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4才行)

      Spark

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:歸納整理XLOOKUP函數(shù)用法
      下一篇:關(guān)于 CPU
      相關(guān)文章
      亚洲区小说区激情区图片区| 亚洲av一综合av一区| 亚洲国产成人久久综合碰碰动漫3d | 亚洲精品无码鲁网中文电影| 亚洲日本一区二区一本一道| 国产成人高清亚洲一区久久 | 亚洲色大成网站www永久男同| 亚洲Av高清一区二区三区| 亚洲国产成人91精品| 亚洲精品福利在线观看| 亚洲精品福利在线观看| 亚洲国产成人精品久久 | 亚洲国产高清在线一区二区三区| 亚洲精品免费观看| 亚洲AV日韩AV永久无码绿巨人 | 含羞草国产亚洲精品岁国产精品| 亚洲精品资源在线| 亚洲精品91在线| 亚洲人成网站在线观看播放青青| 亚洲AV无码一区二区三区在线| 亚洲一区精品视频在线| 亚洲黄色激情视频| 亚洲精品无码久久久久A片苍井空 亚洲精品无码久久久久YW | 亚洲三级中文字幕| 亚洲综合精品第一页| 亚洲综合无码无在线观看| 久久亚洲欧美国产精品| 国产成人亚洲综合a∨| 亚洲视频一区二区| 亚洲精品无码午夜福利中文字幕| 亚洲国产精彩中文乱码AV| 亚洲国产精品婷婷久久| 亚洲理论精品午夜电影| 亚洲色欲色欲www| 亚洲国产成人手机在线观看| 亚洲国产综合精品中文字幕 | 亚洲成AV人在线观看天堂无码| 久久久无码精品亚洲日韩蜜臀浪潮| 亚洲国产精品一区| 亚洲人色大成年网站在线观看| 亚洲欧洲日产国码久在线|