十年網(wǎng)站開發(fā)經(jīng)驗(yàn) + 多家企業(yè)客戶 + 靠譜的建站團(tuán)隊(duì)
量身定制 + 運(yùn)營維護(hù)+專業(yè)推廣+無憂售后,網(wǎng)站問題一站解決
本篇文章為大家展示了在Spark Streaming job中如何讀取Kafka messages及其offsetRange,內(nèi)容簡明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),紅安企業(yè)網(wǎng)站建設(shè),紅安品牌網(wǎng)站建設(shè),網(wǎng)站定制,紅安網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,紅安網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
在Spark Streaming job中讀取Kafka topic(s)中的messages時(shí),有時(shí)我們會(huì)需要同步記錄下每次讀取的messages的offsetRange。要達(dá)到這一目的,下面這兩段代碼(代碼1和代碼2)都是正確的,而且是等價(jià)的。
代碼1(正確):
-----------------------
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.foreachRDD(
new Function
@Override
public Void call(JavaPairRDD
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaRDD
long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代碼2(正確):
-----------------------
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
final AtomicReference
lines = messages.transformToPair(new Function
@Override
public JavaPairRDD
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).map(new Function
@Override
public String call(Tuple2
return tuple2._2();
}
});
lines.foreachRDD(new Function
@Override
public Void call(JavaRDD
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
但是要注意,下面這兩段代碼(代碼3和代碼4)是錯(cuò)誤的,它們都會(huì)拋出一個(gè)exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
代碼3(錯(cuò)誤):
-----------------------
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.transform(new Function
@Override
public JavaRDD
return rdd.values();
}
}).foreachRDD(new Function
@Override
public Void call(JavaRDD
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代碼4(錯(cuò)誤):
-----------------------
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.map(new Function
@Override
public String call(Tuple2
return tuple2._2();
}
}).foreachRDD(new Function
@Override
public Void call(JavaRDD
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
上述內(nèi)容就是在Spark Streaming job中如何讀取Kafka messages及其offsetRange,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。