亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

Spark Streaming開發中的一些優化點

2023-05-29 07:03:20
12
0

Spark String 是使用最廣泛的實時處理框架之一,其他包括Apache Flink, Apache Storm, Kafka Streams。
Spark Streamming 相對于其他有更對的性能問題,它的處理是通過時間窗口而不是逐個事件,會導致延遲發生。
良好的配置和開發有有時候會使性能提高10到20倍,
分享一些可以優化的配置:


一. Kafka的直接實現


1) Spark2 的版本中有兩種方式接收kafka數據,一種是兼容kafka0.8.x,另外一種是direct模式,
其中第一種方式缺乏并行性且不兼容TLS, 如果需要同時并行讀取多個topic,就需要使用direct模式
第一種方式:
  val kafkaStream = KafkaUtils.createStream(
         streamingContext,
         [ZK quorum],
         [consumer group id],
         [per-topic number of Kafka partitions to consume]
    )
direct模式并不關心數據是否來自一個或多個topic
     val kafkaStream = KafkaUtils.createDirectStream[String, String]
(ssc,locationStrategy, consumerStrategy)


二,offset 偏移量管理


offset偏移量指示分配給spark消費者的數據從何處讀取,這個非常重要,這個保證了推流過程中的HA,
避免出現錯誤時丟失數據。
在kafka 0.10.x和spark 2 中offset的管理是通過kafka,為了防止在數據處理過程中
丟失數據,一般有以下幾種方式:
1.使用checkpointing
      val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
2.
在代碼中自己管理offset
  stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      
      // some time later, after outputs have completed
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }


三,分區管理


在kafka中對即將接收的topic進行分區消費非常重要,進行分區可以運行在不同的spark執行器中
并行接收數據。
以下是創建三個程序并行分別使用兩個cpu和2gb內存
./bin/kafka-topics.sh --create --replication-factor 1 --partitions 3 --zookeeper zookeeper:2181 
--topic topicname

 conf.set("spark.cores.max", 6)
 conf.set("spark.executor.cores", 2)
 conf.set("spark.executor.memory", 2GB)


四, spark streaming的一些優化配置


1. 設置poll的超時時間
在spark2中默認的超時時間是512ms, 如果有很多任務是“Task failed”則需要
增加該值的value繼續觀察
    conf.set("spark.streaming.kafka.consumer.poll.ms", 512)
2, 系列化
使用序列化在kafka中非常重要,你可以使用默認的序列化函數
     val serializers = Map( 
         "key.deserializer" -> classOf[StringDeserializer],
         "value.deserializer" -> classOf[RowDeserializer] 
     ) 
當然你也可以使用kryo進行序列化
      conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      conf.registerKryoClasses(Array(classOf[Foo], classOf[Var]))
      conf.set("spark.kryo.registrationRequired", "true")

 

0條評論
作者已關閉評論
吳****軍
4文章數
0粉絲數
吳****軍
4 文章 | 0 粉絲
原創

Spark Streaming開發中的一些優化點

2023-05-29 07:03:20
12
0

Spark String 是使用最廣泛的實時處理框架之一,其他包括Apache Flink, Apache Storm, Kafka Streams。
Spark Streamming 相對于其他有更對的性能問題,它的處理是通過時間窗口而不是逐個事件,會導致延遲發生。
良好的配置和開發有有時候會使性能提高10到20倍,
分享一些可以優化的配置:


一. Kafka的直接實現


1) Spark2 的版本中有兩種方式接收kafka數據,一種是兼容kafka0.8.x,另外一種是direct模式,
其中第一種方式缺乏并行性且不兼容TLS, 如果需要同時并行讀取多個topic,就需要使用direct模式
第一種方式:
  val kafkaStream = KafkaUtils.createStream(
         streamingContext,
         [ZK quorum],
         [consumer group id],
         [per-topic number of Kafka partitions to consume]
    )
direct模式并不關心數據是否來自一個或多個topic
     val kafkaStream = KafkaUtils.createDirectStream[String, String]
(ssc,locationStrategy, consumerStrategy)


二,offset 偏移量管理


offset偏移量指示分配給spark消費者的數據從何處讀取,這個非常重要,這個保證了推流過程中的HA,
避免出現錯誤時丟失數據。
在kafka 0.10.x和spark 2 中offset的管理是通過kafka,為了防止在數據處理過程中
丟失數據,一般有以下幾種方式:
1.使用checkpointing
      val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
2.
在代碼中自己管理offset
  stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      
      // some time later, after outputs have completed
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }


三,分區管理


在kafka中對即將接收的topic進行分區消費非常重要,進行分區可以運行在不同的spark執行器中
并行接收數據。
以下是創建三個程序并行分別使用兩個cpu和2gb內存
./bin/kafka-topics.sh --create --replication-factor 1 --partitions 3 --zookeeper zookeeper:2181 
--topic topicname

 conf.set("spark.cores.max", 6)
 conf.set("spark.executor.cores", 2)
 conf.set("spark.executor.memory", 2GB)


四, spark streaming的一些優化配置


1. 設置poll的超時時間
在spark2中默認的超時時間是512ms, 如果有很多任務是“Task failed”則需要
增加該值的value繼續觀察
    conf.set("spark.streaming.kafka.consumer.poll.ms", 512)
2, 系列化
使用序列化在kafka中非常重要,你可以使用默認的序列化函數
     val serializers = Map( 
         "key.deserializer" -> classOf[StringDeserializer],
         "value.deserializer" -> classOf[RowDeserializer] 
     ) 
當然你也可以使用kryo進行序列化
      conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      conf.registerKryoClasses(Array(classOf[Foo], classOf[Var]))
      conf.set("spark.kryo.registrationRequired", "true")

 

文章來自個人專欄
文章 | 訂閱
0條評論
作者已關閉評論
作者已關閉評論
0
0