Hướng dẫn are python filters faster? - Bộ lọc python có nhanh hơn không?

Tôi đang chạy

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
5 trên một giao dịch và muốn lưu trữ kết quả theo một chuỗi (tôi cần một chuỗi để tôi có thể sử dụng
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
6 trên nó). Tôi nhận thấy rằng việc tạo một tập hợp từ một đối tượng bộ lọc nhanh hơn rất nhiều so với việc tạo danh sách hoặc một tuple. Tại sao vậy? Trước tiên tôi mặc dù loại bộ lọc là một kiểu con của tập hợp, điều này sẽ giải thích điều này, nhưng hàm
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
5 thực sự giống hệt với biểu thức máy phát, vì vậy nó thực sự không thể là một tập hợp bên trong.

Show

Tôi đã chạy thử nghiệm sau để kiểm tra tốc độ:

import time

def test ( n, seq ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( seq )
        print( method.__name__, ( time.time() - t ) )

someFilter = filter( lambda x: x % 3 == 0, range( 1000 ) )
test( 10000000, someFilter )

Và kết quả đã được nói rõ ràng khi sử dụng một bộ:

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274

Vậy tại sao việc tạo một bộ từ bộ lọc nhanh hơn nhiều? Nó thường mất nhiều thời gian để tạo một bộ từ một chuỗi, trong đó mọi yếu tố phải được băm? Hay là bằng cách nào đó nhận được một sự thúc đẩy từ biểu diễn bộ lọc nội bộ?

Để so sánh, khi chạy thử nghiệm trên biểu thức

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
8,
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
9 mất khoảng gấp đôi so với
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
0 và
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
1 (cả hai đều có tốc độ gần giống nhau).

edit:

Câu trả lời của Sven hoàn toàn đúng, nhưng đối với tính đầy đủ, một bài kiểm tra được cập nhật sẽ chạy trên bộ lọc thực tế:

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )

Kết quả thực sự cho thấy điều gì có ý nghĩa hơn với

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
0 và
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
1 đều là người nhanh nhất, mặc dù bộ không thực sự chậm hơn, vì vậy nó đã giành được sự khác biệt về những gì sẽ sử dụng:

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092

Javastreamingcontext, javadstream và javapairdstream

  • Kafkautils, Kinesisutils
  • Tài liệu Python
  • Thêm các ví dụ về Scala và Java và Python
  • Giấy và video mô tả phát trực tuyến tia lửa.
    • Hướng dẫn lập trình phát trực tuyến Spark
    • Ghi chú
    • Tổng quan
    • Một ví dụ nhanh chóng
    • Các khái niệm cơ bản
    • Liên kết
    • Khởi tạo StreamingContext
    • Các luồng rời rạc (DSTREAM)
    • Đầu vào DSTREAM và người nhận
    • Các phép biến đổi trên Dstreams
    • Hoạt động đầu ra trên Dstreams
    • Các hoạt động của DataFrame và SQL
    • Hoạt động Mllib
  • Bộ nhớ đệm / kiên trì
    • Kiểm tra
    • Bộ tích lũy, biến phát sóng và trạm kiểm soát
    • Triển khai các ứng dụng
  • Giám sát các ứng dụng
  • Đi đâu từ đây

Kafkautils, Kinesisutils

Tài liệu Python

Tài liệu Python

Thêm các ví dụ về Scala và Java và Python

Hướng dẫn are python filters faster? - Bộ lọc python có nhanh hơn không?

Giấy và video mô tả phát trực tuyến tia lửa.

Hướng dẫn are python filters faster? - Bộ lọc python có nhanh hơn không?

Hướng dẫn lập trình phát trực tuyến Spark

Ghi chú

Tổng quan There are a few APIs that are either different or not available in Python. Throughout this guide, you will find the tag Python API highlighting these differences.


Thêm các ví dụ về Scala và Java và Python

Giấy và video mô tả phát trực tuyến tia lửa.

Hướng dẫn lập trình phát trực tuyến Spark

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

Ghi chú

Tổng quan

Một ví dụ nhanh chóng

// Split each line into words
val words = lines.flatMap(_.split(" "))

Các khái niệm cơ bản

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

Liên kết

Lưu ý rằng khi các dòng này được thực thi, Spark Stream chỉ thiết lập tính toán mà nó sẽ thực hiện khi nó được bắt đầu và chưa có xử lý thực sự nào bắt đầu. Để bắt đầu xử lý sau khi tất cả các phép biến đổi đã được thiết lập, cuối cùng chúng tôi cũng gọi

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

Mã hoàn chỉnh có thể được tìm thấy trong ví dụ Spark Streaming NetworkCount.

Đầu tiên, chúng tôi tạo một đối tượng JavastreamingContext, đây là điểm nhập chính cho tất cả các chức năng phát trực tuyến. Chúng tôi tạo một streamingcontext cục bộ với hai luồng thực thi và khoảng thời gian hàng loạt là 1 giây.

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

Sử dụng bối cảnh này, chúng ta có thể tạo một DSTREAM đại diện cho dữ liệu phát trực tuyến từ nguồn TCP, được chỉ định là tên máy chủ (ví dụ:

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
8) và cổng (ví dụ:
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
9).

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
0

Dstream

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
00 này thể hiện luồng dữ liệu sẽ được nhận từ máy chủ dữ liệu. Mỗi bản ghi trong luồng này là một dòng văn bản. Sau đó, chúng tôi muốn chia các dòng theo không gian thành từ.

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
1

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
01 là một hoạt động Dstream tạo ra một Dstream mới bằng cách tạo nhiều bản ghi mới từ mỗi bản ghi trong DSTREAM nguồn. Trong trường hợp này, mỗi dòng sẽ được chia thành nhiều từ và luồng từ được biểu diễn dưới dạng
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
02 Dstream. Lưu ý rằng chúng tôi đã xác định chuyển đổi bằng cách sử dụng đối tượng flatMapFunction. Như chúng ta sẽ khám phá trên đường đi, có một số lớp tiện lợi như vậy trong API Java giúp xác định các phép biến đổi DSTREAM.

Tiếp theo, chúng tôi muốn đếm những từ này.

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
2

DSTREAM

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
02 được ánh xạ thêm (chuyển đổi một-một) thành DSTREAM của các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
04, sử dụng đối tượng cặp. Sau đó, nó bị giảm để có được tần suất của các từ trong mỗi lô dữ liệu, sử dụng một đối tượng Function2. Cuối cùng,
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
05 sẽ in một vài trong số các số được tạo ra mỗi giây.

Lưu ý rằng khi các dòng này được thực thi, Spark Stream chỉ thiết lập tính toán mà nó sẽ thực hiện sau khi nó được bắt đầu và chưa có xử lý thực sự nào bắt đầu. Để bắt đầu xử lý sau khi tất cả các phép biến đổi đã được thiết lập, cuối cùng chúng tôi gọi phương thức

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
14.

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
3

Mã hoàn chỉnh có thể được tìm thấy trong ví dụ phát trực tuyến Spark JavanetWorkwordCount.

Đầu tiên, chúng tôi nhập StreamingContext, đây là điểm nhập chính cho tất cả các chức năng phát trực tuyến. Chúng tôi tạo một streamingcontext cục bộ với hai luồng thực thi và khoảng thời gian hàng loạt là 1 giây.

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
4

Sử dụng bối cảnh này, chúng ta có thể tạo một DSTREAM đại diện cho dữ liệu phát trực tuyến từ nguồn TCP, được chỉ định là tên máy chủ (ví dụ:

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
8) và cổng (ví dụ:
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
9).

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
5

Dstream

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
00 này thể hiện luồng dữ liệu sẽ được nhận từ máy chủ dữ liệu. Mỗi bản ghi trong luồng này là một dòng văn bản. Sau đó, chúng tôi muốn chia các dòng theo không gian thành từ.

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
01 là một hoạt động Dstream tạo ra một Dstream mới bằng cách tạo nhiều bản ghi mới từ mỗi bản ghi trong DSTREAM nguồn. Trong trường hợp này, mỗi dòng sẽ được chia thành nhiều từ và luồng từ được biểu diễn dưới dạng
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
02 Dstream. Lưu ý rằng chúng tôi đã xác định chuyển đổi bằng cách sử dụng đối tượng flatMapFunction. Như chúng ta sẽ khám phá trên đường đi, có một số lớp tiện lợi như vậy trong API Java giúp xác định các phép biến đổi DSTREAM.

Tiếp theo, chúng tôi muốn đếm những từ này.

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
7

DSTREAM

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
02 được ánh xạ thêm (chuyển đổi một-một) thành DSTREAM của các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
04, sử dụng đối tượng cặp. Sau đó, nó bị giảm để có được tần suất của các từ trong mỗi lô dữ liệu, sử dụng một đối tượng Function2. Cuối cùng,
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
05 sẽ in một vài trong số các số được tạo ra mỗi giây.

Lưu ý rằng khi các dòng này được thực thi, Spark Stream chỉ thiết lập tính toán mà nó sẽ thực hiện khi nó được bắt đầu và chưa có xử lý thực sự nào bắt đầu. Để bắt đầu xử lý sau khi tất cả các phép biến đổi đã được thiết lập, cuối cùng chúng tôi cũng gọi

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
8

Mã hoàn chỉnh có thể được tìm thấy trong ví dụ Spark Streaming NetworkCount.

Đầu tiên, chúng tôi tạo một đối tượng JavastreamingContext, đây là điểm nhập chính cho tất cả các chức năng phát trực tuyến. Chúng tôi tạo một streamingcontext cục bộ với hai luồng thực thi và khoảng thời gian hàng loạt là 1 giây.

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
9

Sử dụng bối cảnh này, chúng ta có thể tạo một DSTREAM đại diện cho dữ liệu phát trực tuyến từ nguồn TCP, được chỉ định là tên máy chủ (ví dụ:

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
8) và cổng (ví dụ:
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
9).

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
0

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
1

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
2

Dstream

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
00 này thể hiện luồng dữ liệu sẽ được nhận từ máy chủ dữ liệu. Mỗi bản ghi trong luồng này là một dòng văn bản. Sau đó, chúng tôi muốn chia các dòng theo không gian thành từ.

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
01 là một hoạt động Dstream tạo ra một Dstream mới bằng cách tạo nhiều bản ghi mới từ mỗi bản ghi trong DSTREAM nguồn. Trong trường hợp này, mỗi dòng sẽ được chia thành nhiều từ và luồng từ được biểu diễn dưới dạng
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
02 Dstream. Lưu ý rằng chúng tôi đã xác định chuyển đổi bằng cách sử dụng đối tượng flatMapFunction. Như chúng ta sẽ khám phá trên đường đi, có một số lớp tiện lợi như vậy trong API Java giúp xác định các phép biến đổi DSTREAM.

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
4

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
5

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
6



Tiếp theo, chúng tôi muốn đếm những từ này.

DSTREAM

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
02 được ánh xạ thêm (chuyển đổi một-một) thành DSTREAM của các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
04, sử dụng đối tượng cặp. Sau đó, nó bị giảm để có được tần suất của các từ trong mỗi lô dữ liệu, sử dụng một đối tượng Function2. Cuối cùng,
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
05 sẽ in một vài trong số các số được tạo ra mỗi giây.

Lưu ý rằng khi các dòng này được thực thi, Spark Stream chỉ thiết lập tính toán mà nó sẽ thực hiện sau khi nó được bắt đầu và chưa có xử lý thực sự nào bắt đầu. Để bắt đầu xử lý sau khi tất cả các phép biến đổi đã được thiết lập, cuối cùng chúng tôi gọi phương thức set 1.9240000247955322 list 8.82200002670288 tuple 7.031999826431274 14.

Mã hoàn chỉnh có thể được tìm thấy trong ví dụ phát trực tuyến Spark JavanetWorkwordCount.

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
7

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
8

Đầu tiên, chúng tôi nhập StreamingContext, đây là điểm nhập chính cho tất cả các chức năng phát trực tuyến. Chúng tôi tạo một streamingcontext cục bộ với hai luồng thực thi và khoảng thời gian hàng loạt là 1 giây.

NguồnCổ vật
Kafka Spark-Streaming-Kafka-0-10_2.12
Kinesis
Spark-Streaming-Kinesis-ASL_2.12 [Giấy phép phần mềm Amazon]

Để biết danh sách cập nhật, vui lòng tham khảo kho lưu trữ Maven để biết danh sách đầy đủ các nguồn và hiện vật được hỗ trợ.


Khởi tạo StreamingContext

Để khởi tạo một chương trình phát trực tuyến Spark, một đối tượng StreamingContext phải được tạo, đây là điểm nhập chính của tất cả các chức năng phát luồng tia lửa.StreamingContext object has to be created which is the main entry point of all Spark Streaming functionality.

Một đối tượng StreamingContext có thể được tạo từ một đối tượng SparkConf.

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
9

Tham số

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
24 là tên để ứng dụng của bạn hiển thị trên UI cụm.
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 là một tia lửa, mesos, kubernetes hoặc url cụm sợi hoặc chuỗi đặc biệt cục bộ [*] []] để chạy ở chế độ cục bộ. Trong thực tế, khi chạy trên một cụm, bạn sẽ không muốn mã hóa
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 trong chương trình, mà là khởi chạy ứng dụng với
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
27 và nhận nó ở đó. Tuy nhiên, đối với các thử nghiệm và thử nghiệm đơn vị cục bộ, bạn có thể vượt qua cục bộ [*] để chạy phát hiện phát hiện trong quá trình (phát hiện số lượng lõi trong hệ thống cục bộ). Lưu ý rằng điều này tạo ra một sparkcontext (điểm bắt đầu của tất cả các chức năng tia lửa) có thể được truy cập là
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
28.“local[*]” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 in the program, but rather launch the application with
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
27 and receive it there. However, for local testing and unit tests, you can pass “local[*]” to run Spark Streaming in-process (detects the number of cores in the local system). Note that this internally creates a SparkContext (starting point of all Spark functionality) which can be accessed as
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
28.

Khoảng thời gian hàng loạt phải được đặt dựa trên các yêu cầu về độ trễ của ứng dụng của bạn và tài nguyên cụm có sẵn. Xem phần điều chỉnh hiệu suất để biết thêm chi tiết.

Một đối tượng

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
29 cũng có thể được tạo từ một đối tượng
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
30 hiện có.

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
0

Một đối tượng JavastreamingContext có thể được tạo từ một đối tượng SparkConf.

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
1

Tham số

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
24 là tên để ứng dụng của bạn hiển thị trên UI cụm.
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 là một tia lửa, mesos hoặc url cụm sợi hoặc chuỗi đặc biệt cục bộ [*] để chạy ở chế độ cục bộ. Trong thực tế, khi chạy trên một cụm, bạn sẽ không muốn mã hóa
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 trong chương trình, mà là khởi chạy ứng dụng với
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
27 và nhận nó ở đó. Tuy nhiên, đối với các thử nghiệm và thử nghiệm đơn vị cục bộ, bạn có thể vượt qua địa phương [*] để chạy quá trình phát trực tuyến tia lửa. Lưu ý rằng điều này tạo ra một javasparkContext (điểm bắt đầu của tất cả các chức năng tia lửa) có thể được truy cập là
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
28.“local[*]” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 in the program, but rather launch the application with
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
27 and receive it there. However, for local testing and unit tests, you can pass “local[*]” to run Spark Streaming in-process. Note that this internally creates a JavaSparkContext (starting point of all Spark functionality) which can be accessed as
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
28.

Khoảng thời gian hàng loạt phải được đặt dựa trên các yêu cầu về độ trễ của ứng dụng của bạn và tài nguyên cụm có sẵn. Xem phần điều chỉnh hiệu suất để biết thêm chi tiết.

Một đối tượng

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
29 cũng có thể được tạo từ một đối tượng
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
30 hiện có.

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
2

Một đối tượng JavastreamingContext có thể được tạo từ một đối tượng SparkConf.

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
3

Tham số

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
24 là tên để ứng dụng của bạn hiển thị trên UI cụm.
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 là một tia lửa, mesos hoặc url cụm sợi hoặc chuỗi đặc biệt cục bộ [*] để chạy ở chế độ cục bộ. Trong thực tế, khi chạy trên một cụm, bạn sẽ không muốn mã hóa
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 trong chương trình, mà là khởi chạy ứng dụng với
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
27 và nhận nó ở đó. Tuy nhiên, đối với các thử nghiệm và thử nghiệm đơn vị cục bộ, bạn có thể vượt qua địa phương [*] để chạy quá trình phát trực tuyến tia lửa. Lưu ý rằng điều này tạo ra một javasparkContext (điểm bắt đầu của tất cả các chức năng tia lửa) có thể được truy cập là
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
28.“local[*]” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
25 in the program, but rather launch the application with
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
27 and receive it there. However, for local testing and unit tests, you can pass “local[*]” to run Spark Streaming in-process (detects the number of cores in the local system).

Khoảng thời gian hàng loạt phải được đặt dựa trên các yêu cầu về độ trễ của ứng dụng của bạn và tài nguyên cụm có sẵn. Xem phần điều chỉnh hiệu suất để biết thêm chi tiết.

Một đối tượng

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
29 cũng có thể được tạo từ một đối tượng
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
30 hiện có.

  1. Một đối tượng JavastreamingContext có thể được tạo từ một đối tượng SparkConf.
  2. Tham số
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    24 là tên để ứng dụng của bạn hiển thị trên UI cụm.
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    25 là một tia lửa, mesos hoặc url cụm sợi hoặc chuỗi đặc biệt cục bộ [*] để chạy ở chế độ cục bộ. Trong thực tế, khi chạy trên một cụm, bạn sẽ không muốn mã hóa
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    25 trong chương trình, mà là khởi chạy ứng dụng với
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    27 và nhận nó ở đó. Tuy nhiên, đối với các thử nghiệm và thử nghiệm đơn vị cục bộ, bạn có thể vượt qua địa phương [*] để chạy quá trình phát trực tuyến tia lửa. Lưu ý rằng điều này tạo ra một javasparkContext (điểm bắt đầu của tất cả các chức năng tia lửa) có thể được truy cập là
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    28.
  3. Một đối tượng
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    36 cũng có thể được tạo từ
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    37 hiện có.
  4. Một đối tượng StreamingContext có thể được tạo từ một đối tượng SparkContext.
  5. Tham số
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    24 là tên để ứng dụng của bạn hiển thị trên UI cụm.
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    25 là một tia lửa, mesos hoặc url cụm sợi hoặc chuỗi đặc biệt cục bộ [*] để chạy ở chế độ cục bộ. Trong thực tế, khi chạy trên một cụm, bạn sẽ không muốn mã hóa
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    25 trong chương trình, mà là khởi chạy ứng dụng với
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    27 và nhận nó ở đó. Tuy nhiên, đối với các thử nghiệm và thử nghiệm đơn vị cục bộ, bạn có thể vượt qua cục bộ [*] để chạy phát hiện phát hiện trong quá trình (phát hiện số lượng lõi trong hệ thống cục bộ).
Sau khi một bối cảnh được xác định, bạn phải làm như sau.
  • Xác định các nguồn đầu vào bằng cách tạo DSTREAM đầu vào.
  • Xác định các tính toán phát trực tuyến bằng cách áp dụng các hoạt động chuyển đổi và đầu ra cho DSTREAM.
  • Bắt đầu nhận dữ liệu và xử lý nó bằng
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    42.
  • Đợi quá trình xử lý bị dừng (thủ công hoặc do bất kỳ lỗi nào) bằng cách sử dụng
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    43.
  • Việc xử lý có thể được dừng thủ công bằng cách sử dụng
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    44.

Điểm cần nhớ:

Khi một bối cảnh đã được bắt đầu, không có tính toán phát trực tuyến mới nào có thể được thiết lập hoặc thêm vào nó. or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset (see Spark Programming Guide for more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

Hướng dẫn are python filters faster? - Bộ lọc python có nhanh hơn không?

Bất kỳ hoạt động nào được áp dụng trên DSTREAM đều chuyển sang các hoạt động trên các RDD bên dưới. Ví dụ, trong ví dụ trước đó về việc chuyển đổi một dòng dòng thành từ, hoạt động

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
01 được áp dụng trên mỗi RDD trong DSTREAM
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
00 để tạo RDD của DSTREAM
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
02. Điều này được thể hiện trong hình dưới đây.

Hướng dẫn are python filters faster? - Bộ lọc python có nhanh hơn không?

Những biến đổi RDD cơ bản này được tính toán bởi động cơ tia lửa. Các hoạt động DSTREAM ẩn hầu hết các chi tiết này và cung cấp cho nhà phát triển API cấp cao hơn để thuận tiện. Các hoạt động này được thảo luận chi tiết trong các phần sau.


Đầu vào DSTREAM và người nhận

DSTREAM đầu vào là DSTREAM đại diện cho luồng dữ liệu đầu vào nhận được từ các nguồn phát trực tuyến. Trong ví dụ nhanh,

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
00 là một Dstream đầu vào vì nó đại diện cho luồng dữ liệu nhận được từ máy chủ Netcat. Mỗi Dstream đầu vào (trừ luồng tệp, được thảo luận sau trong phần này) được liên kết với đối tượng máy thu (Scala Doc, Java Doc) nhận dữ liệu từ một nguồn và lưu trữ nó trong bộ nhớ Spark Spark để xử lý.Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.

Spark Streaming cung cấp hai loại nguồn phát trực tuyến tích hợp.

  • Nguồn cơ bản: Các nguồn có sẵn trực tiếp trong API phát trực tuyến. Ví dụ: Hệ thống tệp và kết nối ổ cắm.
  • Nguồn nâng cao: Các nguồn như Kafka, Kinesis, v.v. có sẵn thông qua các lớp tiện ích bổ sung. Những điều này yêu cầu liên kết với các phụ thuộc thêm như được thảo luận trong phần liên kết.

Chúng tôi sẽ thảo luận về một số nguồn có mặt trong mỗi loại sau này trong phần này.

Lưu ý rằng, nếu bạn muốn nhận nhiều luồng dữ liệu song song trong ứng dụng phát trực tuyến của mình, bạn có thể tạo nhiều DStream đầu vào (được thảo luận thêm trong phần điều chỉnh hiệu suất). Điều này sẽ tạo ra nhiều máy thu sẽ đồng thời nhận nhiều luồng dữ liệu. Nhưng lưu ý rằng một công nhân/giám đốc điều hành Spark là một nhiệm vụ dài hạn, do đó nó chiếm một trong những lõi được phân bổ cho ứng dụng phát trực tuyến Spark. Do đó, điều quan trọng cần nhớ là một ứng dụng phát tia tia lửa cần được phân bổ đủ lõi (hoặc luồng, nếu chạy cục bộ) để xử lý dữ liệu nhận được, cũng như để chạy (các) máy thu.

Điểm để nhớ
  • Khi chạy một chương trình phát trực tuyến tia lửa cục bộ, không sử dụng địa phương của địa phương hoặc địa phương [1] là URL chính. Một trong hai có nghĩa là chỉ có một luồng sẽ được sử dụng để chạy các tác vụ cục bộ. Nếu bạn đang sử dụng Dstream đầu vào dựa trên máy thu (ví dụ: ổ cắm, kafka, v.v.), thì luồng đơn sẽ được sử dụng để chạy máy thu, không để lại luồng để xử lý dữ liệu nhận được. Do đó, khi chạy cục bộ, luôn luôn sử dụng địa phương [n], làm URL chính, trong đó số lượng máy thu để chạy (xem Thuộc tính Spark để biết thông tin về cách đặt Master).

  • Mở rộng logic để chạy trên một cụm, số lượng lõi được phân bổ cho ứng dụng phát tia tia lửa phải nhiều hơn số lượng máy thu. Nếu không, hệ thống sẽ nhận được dữ liệu, nhưng không thể xử lý nó.

Nguồn cơ bản

Chúng tôi đã xem xét

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
51 trong ví dụ nhanh tạo ra Dstream từ dữ liệu văn bản nhận được qua kết nối ổ cắm TCP. Bên cạnh ổ cắm, API phát trực tuyến cung cấp các phương thức tạo DSTREAM từ các tệp dưới dạng nguồn đầu vào.

Luồng tệp

Để đọc dữ liệu từ các tệp trên bất kỳ hệ thống tệp nào tương thích với API HDFS (nghĩa là HDFS, S3, NFS, v.v.), có thể tạo DSTREAM như thông qua

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
52.

Các luồng tệp không yêu cầu chạy máy thu để không cần phân bổ bất kỳ lõi nào để nhận dữ liệu tệp.

Đối với các tệp văn bản đơn giản, phương pháp dễ nhất là

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
53.

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
4

Cho các tập tin văn bản

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
5

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
6

Cho các tập tin văn bản

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
7

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
54 không có sẵn trong API Python; Chỉ có
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
55 có sẵn.

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
5

Cách theo dõi các thư mục

Spark Streaming sẽ giám sát thư mục

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
56 và xử lý bất kỳ tệp nào được tạo trong thư mục đó.

  • Một thư mục đơn giản có thể được theo dõi, chẳng hạn như
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    57. Tất cả các tệp trực tiếp theo một đường dẫn như vậy sẽ được xử lý khi chúng được phát hiện.
  • Một mẫu Glob Posix có thể được cung cấp, chẳng hạn như
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    58. Ở đây, Dstream sẽ bao gồm tất cả các tệp trong các thư mục phù hợp với mẫu. Đó là: nó là một mô hình của các thư mục, không phải của các tập tin trong các thư mục.
  • Tất cả các tập tin phải ở cùng định dạng dữ liệu.
  • Một tệp được coi là một phần của khoảng thời gian dựa trên thời gian sửa đổi của nó, không phải thời gian tạo của nó.
  • Sau khi được xử lý, các thay đổi thành tệp trong cửa sổ hiện tại sẽ không khiến tệp được đọc lại. Đó là: Cập nhật bị bỏ qua.
  • Càng nhiều tệp trong thư mục, sẽ càng nhiều để quét các thay đổi - ngay cả khi không có tệp nào được sửa đổi.
  • Nếu một ký tự đại diện được sử dụng để xác định các thư mục, chẳng hạn như
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    59, đổi tên toàn bộ thư mục để khớp đường dẫn sẽ thêm thư mục vào danh sách các thư mục được giám sát. Chỉ các tệp trong thư mục có thời gian sửa đổi nằm trong cửa sổ hiện tại sẽ được bao gồm trong luồng.
  • Gọi
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    60 để khắc phục dấu thời gian là một cách để chọn tệp trong cửa sổ sau, ngay cả khi nội dung của nó không thay đổi.
Sử dụng các lưu trữ đối tượng làm nguồn dữ liệu

Các hệ thống tập tin đầy đủ của các hệ thống tập tin như HDF có xu hướng đặt thời gian sửa đổi trên các tệp của họ ngay khi luồng đầu ra được tạo. Khi một tệp được mở, ngay cả trước khi dữ liệu được viết hoàn toàn, nó có thể được bao gồm trong

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
61 - sau đó các bản cập nhật cho tệp trong cùng một cửa sổ sẽ bị bỏ qua. Đó là: các thay đổi có thể bị bỏ lỡ và dữ liệu được bỏ qua từ luồng.

Để đảm bảo rằng các thay đổi được chọn trong cửa sổ, hãy viết tệp vào thư mục không được giám sát, sau đó, ngay sau khi luồng đầu ra được đóng, đổi tên nó thành thư mục đích. Với điều kiện tệp được đổi tên xuất hiện trong thư mục đích được quét trong cửa sổ tạo của nó, dữ liệu mới sẽ được chọn.

Ngược lại, các cửa hàng đối tượng như Amazon S3 và Azure Storage thường có các hoạt động đổi tên chậm, vì dữ liệu thực sự được sao chép. Hơn nữa, đối tượng được đổi tên thành có thể có thời gian hoạt động

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
62 là thời gian sửa đổi của nó, do đó có thể không được coi là một phần của cửa sổ mà thời gian tạo ban đầu đã ngụ ý.

Kiểm tra cẩn thận là cần thiết đối với kho lưu trữ đối tượng đích để xác minh rằng hành vi dấu thời gian của cửa hàng phù hợp với dự kiến ​​của phát trực tuyến Spark. Có thể việc viết trực tiếp vào thư mục đích là chiến lược phù hợp để truyền dữ liệu thông qua cửa hàng đối tượng đã chọn.

Để biết thêm chi tiết về chủ đề này, hãy tham khảo đặc tả hệ thống tập tin Hadoop.

Luồng dựa trên máy thu tùy chỉnh

DSTREAM có thể được tạo bằng các luồng dữ liệu nhận được thông qua các máy thu tùy chỉnh. Xem Hướng dẫn nhận tùy chỉnh để biết thêm chi tiết.

Hàng đợi các RDD dưới dạng luồng

Để kiểm tra ứng dụng phát tia tia lửa với dữ liệu thử nghiệm, người ta cũng có thể tạo DSTREAM dựa trên hàng đợi RDD, sử dụng

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
63. Mỗi RDD được đẩy vào hàng đợi sẽ được coi là một lô dữ liệu trong Dstream và được xử lý như một luồng.

Để biết thêm chi tiết về các luồng từ ổ cắm và tệp, hãy xem các tài liệu API của các chức năng liên quan trong StreamingContext cho Scala, JavastreamingContext cho Java và StreamingContext cho Python.

Nguồn tiên tiến

API Python Khi Spark 3.3.1, trong số các nguồn này, Kafka và Kinesis có sẵn trong API Python. As of Spark 3.3.1, out of these sources, Kafka and Kinesis are available in the Python API.

Danh mục nguồn này yêu cầu giao tiếp với các thư viện phi nước ngoài bên ngoài, một số trong số chúng có các phụ thuộc phức tạp (ví dụ: kafka). Do đó, để giảm thiểu các vấn đề liên quan đến xung đột phiên bản của các phụ thuộc, chức năng tạo DSTREAM từ các nguồn này đã được chuyển sang các thư viện riêng biệt có thể được liên kết với rõ ràng khi cần thiết.

Lưu ý rằng các nguồn nâng cao này không có sẵn trong Shark Shell, do đó các ứng dụng dựa trên các nguồn nâng cao này không thể được kiểm tra trong vỏ. Nếu bạn thực sự muốn sử dụng chúng trong Shark Shell, bạn sẽ phải tải xuống bình Maven Artifact tương ứng cùng với các phụ thuộc của nó và thêm nó vào đường dẫn lớp.

Một số trong các nguồn tiên tiến này như sau.

  • Kafka: Spark Streaming 3.3.1 tương thích với các phiên bản môi giới Kafka 0,10 trở lên. Xem Hướng dẫn tích hợp Kafka để biết thêm chi tiết. Spark Streaming 3.3.1 is compatible with Kafka broker versions 0.10 or higher. See the Kafka Integration Guide for more details.

  • Kinesis: Spark Streaming 3.3.1 tương thích với Thư viện máy khách Kinesis 1.2.1. Xem Hướng dẫn tích hợp Kinesis để biết thêm chi tiết. Spark Streaming 3.3.1 is compatible with Kinesis Client Library 1.2.1. See the Kinesis Integration Guide for more details.

Nguồn tùy chỉnh

API Python Điều này chưa được hỗ trợ trong Python. This is not yet supported in Python.

DSTREAM đầu vào cũng có thể được tạo ra từ các nguồn dữ liệu tùy chỉnh. Tất cả những gì bạn phải làm là thực hiện một máy thu do người dùng xác định (xem phần tiếp theo để hiểu đó là gì) có thể nhận dữ liệu từ các nguồn tùy chỉnh và đẩy nó vào Spark. Xem Hướng dẫn nhận tùy chỉnh để biết chi tiết.receiver (see next section to understand what that is) that can receive data from the custom sources and push it into Spark. See the Custom Receiver Guide for details.

Độ tin cậy của người nhận

Có thể có hai loại nguồn dữ liệu dựa trên độ tin cậy của chúng. Các nguồn (như Kafka) cho phép dữ liệu được chuyển được thừa nhận. Nếu hệ thống nhận dữ liệu từ các nguồn đáng tin cậy này thừa nhận chính xác dữ liệu nhận được, có thể đảm bảo rằng không có dữ liệu nào bị mất do bất kỳ loại thất bại nào. Điều này dẫn đến hai loại máy thu:

  1. Máy thu đáng tin cậy - Một máy thu đáng tin cậy gửi chính xác sự xác nhận đến một nguồn đáng tin cậy khi dữ liệu đã được nhận và lưu trữ trong Spark với sự sao chép.
  2. Máy thu không đáng tin cậy - Một người nhận không đáng tin cậy không gửi xác nhận đến một nguồn. Điều này có thể được sử dụng cho các nguồn không hỗ trợ xác nhận, hoặc thậm chí đối với các nguồn đáng tin cậy khi người ta không muốn hoặc cần đi vào sự phức tạp của sự thừa nhận.

Các chi tiết về cách viết một máy thu đáng tin cậy được thảo luận trong hướng dẫn nhận tùy chỉnh.


Các phép biến đổi trên Dstreams

Tương tự như các RDD, các phép biến đổi cho phép dữ liệu từ Dstream đầu vào được sửa đổi. DSTREAM hỗ trợ nhiều biến đổi có sẵn trên Spark RDD bình thường. Một số trong những cái phổ biến như sau.

Biến đổiNghĩa
Bản đồ (Func)(func) Trả về một dstream mới bằng cách chuyển từng phần tử của Dstream nguồn thông qua một hàm hàm.
FlatMap (Func)(func) Tương tự như bản đồ, nhưng mỗi mục đầu vào có thể được ánh xạ tới 0 hoặc nhiều mục đầu ra.
Bộ lọc (Func)(func) Trả về một DSTREAM mới bằng cách chỉ chọn các bản ghi của DSTREAM nguồn mà func trả về đúng.
Phản hồi (Numpartitions)(numPartitions) Thay đổi mức độ song song trong Dstream này bằng cách tạo ra nhiều hoặc ít phân vùng hơn.
Liên minh (Người khác)(otherStream) Trả về một DSTREAM mới chứa sự kết hợp của các yếu tố trong nguồn DSTREAM và OtherdStream.
đếm()() Trả về một DSTREAM mới của các RDD đơn vị đơn vị bằng cách đếm số lượng phần tử trong mỗi RDD của DSTREAM nguồn.
giảm (func)(func) Trả về một DSTREAM mới của các RDD đơn vị đơn vị bằng cách tổng hợp các phần tử trong mỗi RDD của DSTREAM nguồn bằng cách sử dụng func hàm (có hai đối số và trả về một đối số). Chức năng nên được kết hợp và giao hoán để nó có thể được tính toán song song.
CountbyValue ()() Khi được gọi trên một DSTREAM của các phần tử loại K, hãy trả về một DSTREAM mới của các cặp (k, dài) trong đó giá trị của mỗi khóa là tần số của nó trong mỗi RDD của DSTREAM nguồn.
Giảm(func, [numTasks]) Khi được gọi trên các cặp DSTREAM của (K, V), hãy trả về một DSTREAM mới của các cặp (K, V) trong đó các giá trị cho mỗi khóa được tổng hợp bằng cách sử dụng hàm giảm đã cho. Lưu ý: Theo mặc định, điều này sử dụng số lượng tác vụ song song mặc định của Spark (2 cho chế độ cục bộ và ở chế độ cụm, số được xác định bởi thuộc tính cấu hình
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
64) để thực hiện nhóm. Bạn có thể vượt qua đối số
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
65 tùy chọn để đặt một số tác vụ khác nhau.Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
64) to do the grouping. You can pass an optional
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
65 argument to set a different number of tasks.
Tham gia (OtherStream, [NUMTASKS])(otherStream, [numTasks]) Khi được gọi trên hai cặp DSTREAM của (K, V) và (K, W), hãy trả về các cặp Dstream mới của (K, (V, W)) với tất cả các cặp phần tử cho mỗi khóa.
Cogroup (OtherTream, [NUMTASK])(otherStream, [numTasks]) Khi được gọi trên các cặp DSTREAM của (K, V) và (K, W), hãy trả lại một dstream mới của (K, SEQ [V], SEQ [W]).
Biến đổi (Func)(func) Trả về DSTREAM mới bằng cách áp dụng chức năng RDD-to RDD cho mọi RDD của Dstream nguồn. Điều này có thể được sử dụng để thực hiện các hoạt động RDD tùy ý trên Dstream.
UpdatestateByKey (FUNC)(func) Trả về DSTREAM "Trạng thái" mới trong đó trạng thái cho mỗi khóa được cập nhật bằng cách áp dụng hàm đã cho vào trạng thái trước của khóa và các giá trị mới cho khóa. Điều này có thể được sử dụng để duy trì dữ liệu trạng thái tùy ý cho từng khóa.

Một vài trong số những biến đổi này đáng để thảo luận chi tiết hơn.

UpdateStateBykey hoạt động

Hoạt động

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
66 cho phép bạn duy trì trạng thái tùy ý trong khi liên tục cập nhật nó với thông tin mới. Để sử dụng điều này, bạn sẽ phải thực hiện hai bước.

  1. Xác định trạng thái - Trạng thái có thể là một loại dữ liệu tùy ý.
  2. Xác định chức năng cập nhật trạng thái - Chỉ định với chức năng Cách cập nhật trạng thái bằng trạng thái trước đó và các giá trị mới từ luồng đầu vào.

Trong mọi đợt, Spark sẽ áp dụng chức năng cập nhật trạng thái cho tất cả các khóa hiện có, bất kể chúng có dữ liệu mới trong một đợt hay không. Nếu chức năng cập nhật trả về

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
67 thì cặp giá trị khóa sẽ bị loại bỏ.

Hãy để minh họa điều này với một ví dụ. Giả sử bạn muốn duy trì số lượng chạy của từng từ được thấy trong luồng dữ liệu văn bản. Ở đây, số lượng chạy là trạng thái và nó là một số nguyên. Chúng tôi xác định chức năng cập nhật là:

set 27.868000030517578
list 27.131999969482422
tuple 27.138000011444092
9

Điều này được áp dụng trên một từ chứa Dstream (giả sử,

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
68 Dstream chứa các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
04 trong ví dụ trước).

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
0

Hàm cập nhật sẽ được gọi cho mỗi từ, với

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
70 có chuỗi 1 1 (từ các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
71) và
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
72 có số lượng trước đó.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
1

Điều này được áp dụng trên một từ chứa Dstream (giả sử,

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
68 Dstream chứa các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
04 trong ví dụ nhanh).

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
2

Hàm cập nhật sẽ được gọi cho mỗi từ, với

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
70 có chuỗi 1 1 (từ các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
71) và
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
72 có số lượng trước đó. Đối với mã Java hoàn chỉnh, hãy xem ví dụ javastateformetworkwordcount.java.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
3

Điều này được áp dụng trên một từ chứa Dstream (giả sử,

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
68 Dstream chứa các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
04 trong ví dụ trước).

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
4

Hàm cập nhật sẽ được gọi cho mỗi từ, với

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
70 có chuỗi 1 1 (từ các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
71) và
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
72 có số lượng trước đó.

Lưu ý rằng sử dụng

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
66 yêu cầu thư mục điểm kiểm tra được cấu hình, được thảo luận chi tiết trong phần kiểm tra.

Chuyển đổi hoạt động

Hoạt động

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
84 (cùng với các biến thể của nó như
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
85) cho phép các hàm RDD-to RDD tùy ý được áp dụng trên DSTREAM. Nó có thể được sử dụng để áp dụng bất kỳ hoạt động RDD nào không được phơi bày trong API Dstream. Ví dụ: chức năng của việc nối mọi lô trong luồng dữ liệu với một bộ dữ liệu khác không được hiển thị trực tiếp trong API DSTREAM. Tuy nhiên, bạn có thể dễ dàng sử dụng
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
84 để làm điều này. Điều này cho phép các khả năng rất mạnh mẽ. Ví dụ, người ta có thể thực hiện làm sạch dữ liệu thời gian thực bằng cách tham gia luồng dữ liệu đầu vào với thông tin spam được tính toán trước (cũng có thể được tạo bằng tia lửa) và sau đó lọc dựa trên nó.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
5

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
6

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
7

Lưu ý rằng chức năng được cung cấp được gọi trong mỗi khoảng thời gian. Điều này cho phép bạn thực hiện các hoạt động RDD thay đổi theo thời gian, nghĩa là các hoạt động RDD, số lượng phân vùng, biến phát sóng, v.v. có thể được thay đổi giữa các lô.

Hoạt động cửa sổ

Spark Streaming cũng cung cấp các tính toán có cửa sổ, cho phép bạn áp dụng các phép biến đổi trên một cửa sổ trượt dữ liệu. Hình sau đây minh họa cửa sổ trượt này.

Hướng dẫn are python filters faster? - Bộ lọc python có nhanh hơn không?

Như được hiển thị trong hình, mỗi khi cửa sổ trượt qua một nguồn nguồn, các RDD nguồn nằm trong cửa sổ được kết hợp và vận hành để tạo ra các RDD của DSTREAM có cửa sổ. Trong trường hợp cụ thể này, hoạt động được áp dụng trong 3 đơn vị dữ liệu cuối cùng và trượt theo 2 đơn vị thời gian. Điều này cho thấy rằng bất kỳ hoạt động cửa sổ cần chỉ định hai tham số.

  • Độ dài cửa sổ - Thời lượng của cửa sổ (3 trong hình).
  • Khoảng thời gian trượt - khoảng thời gian mà hoạt động cửa sổ được thực hiện (2 trong hình).

Hai tham số này phải là bội số của khoảng thời gian hàng loạt của Dstream nguồn (1 trong hình).

Hãy để minh họa các hoạt động cửa sổ với một ví dụ. Giả sử, bạn muốn mở rộng ví dụ trước đó bằng cách tạo số từ trong 30 giây cuối cùng của dữ liệu, cứ sau 10 giây. Để làm điều này, chúng tôi phải áp dụng hoạt động

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
87 trên các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
68 của các cặp
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
71 trong 30 giây cuối cùng của dữ liệu. Điều này được thực hiện bằng cách sử dụng hoạt động
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
90.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
8

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
9

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
0

Một số hoạt động cửa sổ phổ biến như sau. Tất cả các hoạt động này lấy hai tham số nói trên - Windowlowl và Slideinterval.

Biến đổiNghĩa
cửa sổ (Windowlowl, slideinterval)(windowLength, slideInterval) Trả về một DSTREAM mới được tính toán dựa trên các lô cửa sổ của DSTREAM nguồn.
CountbyWindow (Windowlowl, Slideinterval)(windowLength, slideInterval) Trả về một số lượng cửa sổ trượt của các phần tử trong luồng.
Giảm(func, windowLength, slideInterval) Trả về một luồng một phần tử mới, được tạo bằng cách tổng hợp các phần tử trong luồng trong một khoảng trượt bằng cách sử dụng func. Hàm phải được kết hợp và giao hoán để nó có thể được tính toán chính xác song song.
Giảm tốc độ (func, windowlow, slideinterval, [NUMTASKS]))(func, windowLength, slideInterval, [numTasks]) Khi được gọi trên các cặp DSTREAM của (K, V), hãy trả về các cặp Dstream (K, V) mới trong đó các giá trị cho mỗi khóa được tổng hợp bằng cách sử dụng hàm hàm giảm đã cho trên các lô trong cửa sổ trượt. Lưu ý: Theo mặc định, điều này sử dụng số lượng tác vụ song song mặc định của Spark (2 cho chế độ cục bộ và ở chế độ cụm, số được xác định bởi thuộc tính cấu hình
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
64) để thực hiện nhóm. Bạn có thể vượt qua đối số
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
65 tùy chọn để đặt một số tác vụ khác nhau.Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
64) to do the grouping. You can pass an optional
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
65 argument to set a different number of tasks.
Giảm trình giảm dần(func, invFunc, windowLength, slideInterval, [numTasks])

Một phiên bản hiệu quả hơn của

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
93 trên trong đó giá trị giảm của mỗi cửa sổ được tính toán tăng dần bằng cách sử dụng các giá trị giảm của cửa sổ trước. Điều này được thực hiện bằng cách giảm dữ liệu mới đi vào cửa sổ trượt và nghịch đảo làm giảm dữ liệu cũ rời khỏi cửa sổ. Một ví dụ sẽ là của việc thêm vào và trừ các phím của các khóa khi các khóa cửa sổ. Tuy nhiên, nó chỉ áp dụng cho các chức năng giảm khả năng đảo ngược, nghĩa là, các chức năng này có chức năng giảm tương ứng với chức năng giảm bớt (được coi là tham số InvFunc). Giống như trong
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
90, số lượng nhiệm vụ giảm có thể cấu hình thông qua một đối số tùy chọn. Lưu ý rằng việc kiểm tra phải được bật để sử dụng thao tác này.

CountbyValueandWindow (Windowlowl, Slideinterval, [NUMTASKS])(windowLength, slideInterval, [numTasks]) Khi được gọi trên các cặp DSTREAM của (K, V), hãy trả về các cặp Dstream mới của (K, Long) trong đó giá trị của mỗi khóa là tần số của nó trong một cửa sổ trượt. Giống như trong
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
90, số lượng nhiệm vụ giảm có thể cấu hình thông qua một đối số tùy chọn.

Tham gia hoạt động

Cuối cùng, nó đáng để làm nổi bật cách bạn có thể thực hiện các loại tham gia khác nhau trong phát trực tuyến tia lửa.

Stream-Stream tham gia

Các luồng có thể rất dễ dàng kết hợp với các luồng khác.

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
1

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
2

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
3

Ở đây, trong mỗi khoảng thời gian hàng loạt, RDD được tạo bởi

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
96 sẽ được nối với RDD được tạo bởi
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
97. Bạn cũng có thể làm
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
98,
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
99,
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
00. Hơn nữa, thường rất hữu ích khi thực hiện tham gia qua các cửa sổ của các luồng. Đó là khá dễ dàng là tốt.

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
4

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
5

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
6

Stream-dataset tham gia

Điều này đã được hiển thị sớm hơn trong khi giải thích hoạt động

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
01. Đây là một ví dụ khác về việc tham gia một luồng cửa sổ với bộ dữ liệu.

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
7

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
8

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
9

Trên thực tế, bạn cũng có thể tự động thay đổi bộ dữ liệu bạn muốn tham gia. Hàm được cung cấp cho

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
84 được đánh giá mỗi khoảng thời gian hàng loạt và do đó sẽ sử dụng bộ dữ liệu hiện tại mà các điểm tham chiếu
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
03.

Danh sách đầy đủ các biến đổi DSTREAM có sẵn trong tài liệu API. Đối với API Scala, xem Dstream và PairdStreamFiftions. Đối với API Java, xem Javadstream và Javapairdstream. Đối với API Python, xem Dstream.


Hoạt động đầu ra trên Dstreams

Các hoạt động đầu ra cho phép dữ liệu Dstream, được đẩy ra các hệ thống bên ngoài như cơ sở dữ liệu hoặc hệ thống tệp. Do các hoạt động đầu ra thực sự cho phép dữ liệu được chuyển đổi được tiêu thụ bởi các hệ thống bên ngoài, chúng kích hoạt việc thực hiện thực tế tất cả các phép biến đổi Dstream (tương tự như các hành động cho RDD). Hiện tại, các hoạt động đầu ra sau được xác định:

Hoạt động đầu raNghĩa
in()() In mười yếu tố đầu tiên của mỗi lô dữ liệu trong một DSTREAM trên nút trình điều khiển chạy ứng dụng phát trực tuyến. Điều này rất hữu ích cho sự phát triển và gỡ lỗi. API Python Đây được gọi là pprint () trong API Python.
Python API This is called pprint() in the Python API.
SAVEASTEXTFILES (tiền tố, [hậu tố])(prefix, [suffix]) Lưu nội dung của Dstream này dưới dạng tệp văn bản. Tên tệp ở mỗi khoảng thời gian hàng loạt được tạo dựa trên tiền tố và hậu tố: "Tiền tố thời gian_in_ms [.suffix]".
SaveasobjectFiles (tiền tố, [hậu tố])(prefix, [suffix]) Lưu nội dung của Dstream này dưới dạng
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
04 của các đối tượng Java tuần tự hóa. Tên tệp ở mỗi khoảng thời gian hàng loạt được tạo dựa trên tiền tố và hậu tố: "Tiền tố thời gian_in_ms [.suffix]". API Python Điều này không có sẵn trong API Python.
Python API This is not available in the Python API.
Saveashadoopfiles (tiền tố, [hậu tố])(prefix, [suffix]) Lưu nội dung của Dstream này dưới dạng tệp Hadoop. Tên tệp ở mỗi khoảng thời gian hàng loạt được tạo dựa trên tiền tố và hậu tố: "Tiền tố thời gian_in_ms [.suffix]". API Python Điều này không có sẵn trong API Python.
Python API This is not available in the Python API.
foreachrdd (func)(func) Toán tử đầu ra chung nhất áp dụng chức năng, func, cho mỗi RDD được tạo từ luồng. Hàm này sẽ đẩy dữ liệu trong mỗi RDD vào một hệ thống bên ngoài, chẳng hạn như lưu RDD vào các tệp hoặc ghi nó qua mạng vào cơ sở dữ liệu. Lưu ý rằng hàm chức năng được thực thi trong quy trình trình điều khiển chạy ứng dụng phát trực tuyến và thường sẽ có các hành động RDD trong đó sẽ buộc tính toán RDD phát trực tuyến.

Các mẫu thiết kế để sử dụng foreachrdd

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
05 là một nguyên thủy mạnh mẽ cho phép dữ liệu được gửi đến các hệ thống bên ngoài. Tuy nhiên, điều quan trọng là phải hiểu làm thế nào để sử dụng nguyên thủy này một cách chính xác và hiệu quả. Một số sai lầm phổ biến cần tránh như sau.

Thường ghi dữ liệu vào hệ thống bên ngoài yêu cầu tạo đối tượng kết nối (ví dụ: kết nối TCP đến máy chủ từ xa) và sử dụng nó để gửi dữ liệu đến một hệ thống từ xa. Với mục đích này, một nhà phát triển có thể vô tình thử tạo một đối tượng kết nối tại trình điều khiển Spark, sau đó cố gắng sử dụng nó trong một nhân viên Spark để lưu các bản ghi trong RDD. Ví dụ (tính bằng Scala),

// Split each line into words
val words = lines.flatMap(_.split(" "))
0

// Split each line into words
val words = lines.flatMap(_.split(" "))
1

// Split each line into words
val words = lines.flatMap(_.split(" "))
2

Điều này là không chính xác vì điều này đòi hỏi đối tượng kết nối phải được tuần tự hóa và gửi từ trình điều khiển đến công nhân. Các đối tượng kết nối như vậy hiếm khi có thể chuyển nhượng trên các máy. Lỗi này có thể biểu hiện dưới dạng lỗi tuần tự hóa (đối tượng kết nối không thể tuần tự), lỗi khởi tạo (đối tượng kết nối cần được khởi tạo tại công nhân), v.v. Giải pháp chính xác là tạo đối tượng kết nối tại công nhân.

Tuy nhiên, điều này có thể dẫn đến một sai lầm phổ biến khác - tạo ra một kết nối mới cho mỗi bản ghi. Ví dụ,

// Split each line into words
val words = lines.flatMap(_.split(" "))
3

// Split each line into words
val words = lines.flatMap(_.split(" "))
4

// Split each line into words
val words = lines.flatMap(_.split(" "))
5

Thông thường, tạo một đối tượng kết nối có chi phí thời gian và tài nguyên. Do đó, việc tạo và phá hủy một đối tượng kết nối cho mỗi bản ghi có thể phát sinh các chi phí cao không cần thiết và có thể giảm đáng kể thông lượng tổng thể của hệ thống. Một giải pháp tốt hơn là sử dụng

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
06 - Tạo một đối tượng kết nối duy nhất và gửi tất cả các bản ghi trong phân vùng RDD bằng kết nối đó.

// Split each line into words
val words = lines.flatMap(_.split(" "))
6

// Split each line into words
val words = lines.flatMap(_.split(" "))
7

// Split each line into words
val words = lines.flatMap(_.split(" "))
8

Điều này khấu hao các chi phí tạo kết nối qua nhiều hồ sơ.

Cuối cùng, điều này có thể được tối ưu hóa hơn nữa bằng cách sử dụng lại các đối tượng kết nối trên nhiều RDD/lô. Người ta có thể duy trì một nhóm các đối tượng kết nối tĩnh hơn có thể được sử dụng lại vì các RDD của nhiều lô được đẩy đến hệ thống bên ngoài, do đó làm giảm thêm các chi phí.

// Split each line into words
val words = lines.flatMap(_.split(" "))
9

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
0

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
1

Lưu ý rằng các kết nối trong nhóm nên được tạo ra một cách uể oải theo yêu cầu và hết thời gian nếu không được sử dụng trong một thời gian. Điều này đạt được việc gửi dữ liệu hiệu quả nhất đến các hệ thống bên ngoài.

Những điểm khác cần nhớ:
  • DSTREAM được thực hiện một cách uể oải bởi các hoạt động đầu ra, giống như RDD được thực hiện một cách uể oải bởi các hành động RDD. Cụ thể, các hành động RDD bên trong các hoạt động đầu ra Dstream buộc xử lý dữ liệu nhận được. Do đó, nếu ứng dụng của bạn không có bất kỳ hoạt động đầu ra nào hoặc có các hoạt động đầu ra như

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    07 mà không có bất kỳ hành động RDD nào bên trong chúng, thì sẽ không có gì được thực thi. Hệ thống sẽ chỉ đơn giản nhận được dữ liệu và loại bỏ nó.

  • Theo mặc định, các hoạt động đầu ra được thực hiện một lần một lần. Và chúng được thực thi theo thứ tự chúng được xác định trong ứng dụng.


Các hoạt động của DataFrame và SQL

Bạn có thể dễ dàng sử dụng các hoạt động DataFrames và SQL trên dữ liệu phát trực tuyến. Bạn phải tạo một Sparksession bằng cách sử dụng SparkContext mà StreamingContext đang sử dụng. Hơn nữa, điều này phải được thực hiện sao cho nó có thể được khởi động lại khi lỗi trình điều khiển. Điều này được thực hiện bằng cách tạo ra một ví dụ đơn giản của Sparksession. Điều này được hiển thị trong ví dụ sau. Nó sửa đổi ví dụ đếm từ trước để tạo số từ sử dụng DataFrames và SQL. Mỗi RDD được chuyển đổi thành DataFrame, được đăng ký dưới dạng bảng tạm thời và sau đó được truy vấn bằng SQL.

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
2

Xem mã nguồn đầy đủ.

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
3

Xem mã nguồn đầy đủ.

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
4

Xem mã nguồn đầy đủ.

Bạn cũng có thể chạy các truy vấn SQL trên các bảng được xác định trên dữ liệu phát trực tuyến từ một luồng khác (nghĩa là không đồng bộ với StreamingContext đang chạy). Chỉ cần đảm bảo rằng bạn đặt StreamingContext để nhớ một lượng dữ liệu phát trực tuyến đủ sao cho truy vấn có thể chạy. Mặt khác, StreamingContext, không biết về bất kỳ truy vấn SQL không đồng bộ nào, sẽ xóa dữ liệu phát trực tuyến cũ trước khi truy vấn có thể hoàn thành. Ví dụ: nếu bạn muốn truy vấn lô cuối cùng, nhưng truy vấn của bạn có thể mất 5 phút để chạy, thì hãy gọi

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
08 (tính bằng Scala hoặc tương đương trong các ngôn ngữ khác).

Xem Hướng dẫn DataFrames và SQL để tìm hiểu thêm về DataFrames.


Hoạt động Mllib

Bạn cũng có thể dễ dàng sử dụng các thuật toán học máy được cung cấp bởi MLLIB. Trước hết, có các thuật toán học máy phát trực tuyến (ví dụ: hồi quy tuyến tính phát trực tuyến, phát trực tuyến kmeans, v.v.) có thể học đồng thời từ dữ liệu phát trực tuyến cũng như áp dụng mô hình trên dữ liệu phát trực tuyến. Ngoài ra, đối với một lớp thuật toán học máy lớn hơn nhiều, bạn có thể tìm hiểu một mô hình học tập ngoại tuyến (nghĩa là sử dụng dữ liệu lịch sử) và sau đó áp dụng mô hình trực tuyến trên dữ liệu phát trực tuyến. Xem Hướng dẫn MLLIB để biết thêm chi tiết.


Bộ nhớ đệm / kiên trì

Tương tự như RDD, DSTREAM cũng cho phép các nhà phát triển tồn tại dữ liệu luồng trong bộ nhớ. Đó là, sử dụng phương thức

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
09 trên DSTREAM sẽ tự động tồn tại mọi RDD của DSTREAM trong bộ nhớ. Điều này rất hữu ích nếu dữ liệu trong dstream sẽ được tính toán nhiều lần (ví dụ: nhiều hoạt động trên cùng một dữ liệu). Đối với các hoạt động dựa trên cửa sổ như
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
10 và
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
90 và các hoạt động dựa trên trạng thái như
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
66, điều này hoàn toàn đúng. Do đó, các DSTREAM được tạo bởi các hoạt động dựa trên cửa sổ được tự động tồn tại trong bộ nhớ mà không cần nhà phát triển gọi
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
09.

Đối với các luồng đầu vào nhận dữ liệu qua mạng (chẳng hạn như Kafka, ổ cắm, v.v.), mức độ tồn tại mặc định được đặt để sao chép dữ liệu thành hai nút để chịu lỗi.

Lưu ý rằng, không giống như RDD, mức độ tồn tại mặc định của các DSTREAM giữ dữ liệu được tuần tự hóa trong bộ nhớ. Điều này được thảo luận thêm trong phần điều chỉnh hiệu suất. Thông tin thêm về các mức độ kiên trì khác nhau có thể được tìm thấy trong Hướng dẫn lập trình tia lửa.


Kiểm tra

Một ứng dụng phát trực tuyến phải hoạt động 24/7 và do đó phải có khả năng phục hồi không liên quan đến logic ứng dụng (ví dụ: lỗi hệ thống, sự cố JVM, v.v.). Để điều này là có thể, Spark Streaming cần kiểm tra đủ thông tin đến một hệ thống lưu trữ có lỗi để nó có thể phục hồi sau khi thất bại. Có hai loại dữ liệu được chỉ định.

  • CheckPoinTing siêu dữ liệu - Lưu thông tin xác định tính toán phát trực tuyến để lưu trữ chịu lỗi như HDFS. Điều này được sử dụng để phục hồi sau lỗi của nút chạy trình điều khiển của ứng dụng phát trực tuyến (được thảo luận chi tiết sau). Siêu dữ liệu bao gồm:
    • Cấu hình - Cấu hình được sử dụng để tạo ứng dụng phát trực tuyến.
    • Các hoạt động DSTREAM - tập hợp các hoạt động DSTREAM xác định ứng dụng phát trực tuyến.
    • Các lô không đầy đủ - Các lô có công việc được xếp hàng nhưng chưa hoàn thành.
  • Kiểm tra dữ liệu - Lưu các RDD được tạo vào lưu trữ đáng tin cậy. Điều này là cần thiết trong một số biến đổi trạng thái kết hợp dữ liệu trên nhiều lô. Trong các phép biến đổi như vậy, các RDD được tạo phụ thuộc vào RDD của các lô trước đó, điều này khiến chiều dài của chuỗi phụ thuộc tiếp tục tăng theo thời gian. Để tránh sự gia tăng không giới hạn như vậy về thời gian phục hồi (tỷ lệ với chuỗi phụ thuộc), các RDD trung gian của các biến đổi trạng thái được chỉ định kiểm tra để lưu trữ đáng tin cậy (ví dụ HDF) để cắt các chuỗi phụ thuộc.

Tóm lại, việc kiểm tra siêu dữ liệu chủ yếu là cần thiết để phục hồi từ các lỗi của trình điều khiển, trong khi dữ liệu hoặc kiểm tra RDD là cần thiết ngay cả đối với hoạt động cơ bản nếu sử dụng các phép biến đổi trạng thái.

Khi nào cần cho phép kiểm tra

Kiểm tra phải được bật cho các ứng dụng với bất kỳ yêu cầu nào sau đây:

  • Việc sử dụng các phép biến đổi trạng thái - Nếu
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    66 hoặc
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    90 (có hàm nghịch đảo) được sử dụng trong ứng dụng, thì thư mục điểm kiểm tra phải được cung cấp để cho phép kiểm tra RDD định kỳ.
  • Khôi phục sau khi thất bại của trình điều khiển chạy ứng dụng - Điểm kiểm tra siêu dữ liệu được sử dụng để khôi phục với thông tin tiến trình.

Lưu ý rằng các ứng dụng phát trực tuyến đơn giản mà không có các phép biến đổi trạng thái nói trên có thể được chạy mà không cho phép kiểm tra. Sự phục hồi từ các thất bại của người lái cũng sẽ là một phần trong trường hợp đó (một số dữ liệu nhận được nhưng chưa được xử lý có thể bị mất). Điều này thường được chấp nhận và nhiều ứng dụng phát luồng Spark chạy theo cách này. Hỗ trợ cho môi trường không hadoop dự kiến ​​sẽ cải thiện trong tương lai.

Cách định cấu hình kiểm tra

Có thể bật kiểm tra bằng cách đặt thư mục trong hệ thống tệp đáng tin cậy, chịu lỗi (ví dụ: HDFS, S3, v.v.) mà thông tin điểm kiểm tra sẽ được lưu. Điều này được thực hiện bằng cách sử dụng

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
16. Điều này sẽ cho phép bạn sử dụng các biến đổi trạng thái đã nói ở trên. Ngoài ra, nếu bạn muốn làm cho ứng dụng phục hồi sau khi lỗi trình điều khiển, bạn nên viết lại ứng dụng phát trực tuyến của mình để có hành vi sau.

  • Khi chương trình được bắt đầu lần đầu tiên, nó sẽ tạo ra một phát trực tuyến mới, thiết lập tất cả các luồng và sau đó gọi start ().
  • Khi chương trình đang được khởi động lại sau khi thất bại, nó sẽ tạo lại phát trực tuyến từ dữ liệu điểm kiểm tra trong thư mục điểm kiểm tra.

Hành vi này được thực hiện đơn giản bằng cách sử dụng

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
17. Điều này được sử dụng như sau.

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
5

Nếu

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
18 tồn tại, thì bối cảnh sẽ được tái tạo từ dữ liệu điểm kiểm tra. Nếu thư mục không tồn tại (nghĩa là chạy lần đầu tiên), thì hàm
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
19 sẽ được gọi để tạo bối cảnh mới và thiết lập các DSTREAM. Xem ví dụ Scala RecoverableNetWorkwordCount. Ví dụ này bổ sung từ đếm dữ liệu mạng vào một tệp.

Hành vi này được thực hiện đơn giản bằng cách sử dụng

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
20. Điều này được sử dụng như sau.

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
6

Nếu

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
18 tồn tại, thì bối cảnh sẽ được tái tạo từ dữ liệu điểm kiểm tra. Nếu thư mục không tồn tại (nghĩa là chạy lần đầu tiên), thì hàm
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
22 sẽ được gọi để tạo bối cảnh mới và thiết lập các DSTREAM. Xem ví dụ Java JavarecoverablenetworkingCount. Ví dụ này bổ sung từ đếm dữ liệu mạng vào một tệp.

Hành vi này được thực hiện đơn giản bằng cách sử dụng

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
17. Điều này được sử dụng như sau.

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
7

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
5

Nếu

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
18 tồn tại, thì bối cảnh sẽ được tái tạo từ dữ liệu điểm kiểm tra. Nếu thư mục không tồn tại (nghĩa là chạy lần đầu tiên), thì hàm
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
19 sẽ được gọi để tạo bối cảnh mới và thiết lập các DSTREAM. Xem ví dụ Scala RecoverableNetWorkwordCount. Ví dụ này bổ sung từ đếm dữ liệu mạng vào một tệp.

Hành vi này được thực hiện đơn giản bằng cách sử dụng

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
20. Điều này được sử dụng như sau.

Nếu

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
18 tồn tại, thì bối cảnh sẽ được tái tạo từ dữ liệu điểm kiểm tra. Nếu thư mục không tồn tại (nghĩa là chạy lần đầu tiên), thì hàm
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
22 sẽ được gọi để tạo bối cảnh mới và thiết lập các DSTREAM. Xem ví dụ Java JavarecoverablenetworkingCount. Ví dụ này bổ sung từ đếm dữ liệu mạng vào một tệp.


Bộ tích lũy, biến phát sóng và trạm kiểm soát

Các bộ tích lũy và các biến phát sóng không thể được phục hồi từ điểm kiểm tra trong luồng tia lửa. Nếu bạn cũng bật kiểm tra và sử dụng các bộ tích lũy hoặc các biến phát sóng, bạn sẽ phải tạo các phiên bản singleton được khởi tạo uể oải cho các bộ tích lũy và các biến phát sóng để chúng có thể được bảo vệ lại sau khi trình điều khiển khởi động lại khi thất bại. Điều này được hiển thị trong ví dụ sau.

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
8

Xem mã nguồn đầy đủ.

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
9

Xem mã nguồn đầy đủ.

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
0

Xem mã nguồn đầy đủ.


Triển khai các ứng dụng

Phần này thảo luận về các bước để triển khai một ứng dụng phát luồng tia lửa.

Yêu cầu

Để chạy một ứng dụng phát tia tia lửa, bạn cần phải có những điều sau đây.

  • Cụm với Trình quản lý cụm - Đây là yêu cầu chung của bất kỳ ứng dụng Spark nào và được thảo luận chi tiết trong Hướng dẫn triển khai.

  • Gói bình Ứng dụng - Bạn phải biên dịch ứng dụng phát trực tuyến của mình thành một bình. Nếu bạn đang sử dụng

    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    27 để bắt đầu ứng dụng, thì bạn sẽ không cần cung cấp tia lửa và tia lửa trong bình. Tuy nhiên, nếu ứng dụng của bạn sử dụng các nguồn nâng cao (ví dụ: Kafka), thì bạn sẽ phải đóng gói thêm cổ vật mà chúng liên kết, cùng với các phụ thuộc của chúng, trong bình được sử dụng để triển khai ứng dụng. Ví dụ: một ứng dụng sử dụng
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    31 sẽ phải bao gồm
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    32 và tất cả các phụ thuộc chuyển tiếp của nó trong bình ứng dụng.

  • Định cấu hình bộ nhớ đủ cho người thực thi - vì dữ liệu nhận được phải được lưu trữ trong bộ nhớ, các trình thực thi phải được cấu hình với bộ nhớ đủ để giữ dữ liệu nhận được. Lưu ý rằng nếu bạn đang thực hiện các hoạt động cửa sổ 10 phút, hệ thống phải giữ ít nhất 10 phút cuối cùng trong bộ nhớ. Vì vậy, các yêu cầu bộ nhớ cho ứng dụng phụ thuộc vào các hoạt động được sử dụng trong đó.

  • Định cấu hình kiểm tra - Nếu ứng dụng luồng yêu cầu, thì một thư mục trong bộ nhớ chịu lỗi tương thích API Hadoop (ví dụ: HDFS, S3, v.v.) được sử dụng để phục hồi thất bại. Xem phần CheckPoining để biết thêm chi tiết.

  • Định cấu hình tự động khởi động lại trình điều khiển ứng dụng - Để tự động khôi phục sau lỗi trình điều khiển, cơ sở hạ tầng triển khai được sử dụng để chạy ứng dụng phát trực tuyến phải theo dõi quy trình trình điều khiển và khởi động lại trình điều khiển nếu không thành công. Các nhà quản lý cụm khác nhau có các công cụ khác nhau để đạt được điều này.
    • Spark Standalone - Một trình điều khiển ứng dụng Spark có thể được gửi để chạy trong cụm độc lập Spark (xem chế độ triển khai cụm), nghĩa là trình điều khiển ứng dụng tự chạy trên một trong các nút công nhân. Hơn nữa, Trình quản lý cụm độc lập có thể được hướng dẫn để giám sát trình điều khiển và khởi chạy lại nếu trình điều khiển không thành công do mã thoát khác không hoặc do lỗi của nút chạy trình điều khiển. Xem Chế độ cụm và giám sát trong Hướng dẫn độc lập Spark để biết thêm chi tiết.
    • Sợi - Sợi hỗ trợ một cơ chế tương tự để tự động khởi động lại một ứng dụng. Vui lòng tham khảo tài liệu sợi để biết thêm chi tiết.
    • Mesos - Marathon đã được sử dụng để đạt được điều này với Mesos.
  • Định cấu hình nhật ký ghi lại-Vì Spark 1.2, chúng tôi đã giới thiệu các bản ghi ghi cách ghi để đạt được sự đảm bảo khả năng chịu lỗi mạnh mẽ. Nếu được bật, tất cả dữ liệu nhận được từ một người nhận sẽ được ghi vào nhật ký ghi ghi trong thư mục điểm kiểm tra cấu hình. Điều này ngăn ngừa mất dữ liệu về phục hồi trình điều khiển, do đó đảm bảo mất dữ liệu bằng không (được thảo luận chi tiết trong phần ngữ nghĩa dung sai lỗi). Điều này có thể được bật bằng cách đặt tham số cấu hình

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    33 thành
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    34. Tuy nhiên, các ngữ nghĩa mạnh hơn này có thể đến với chi phí thông lượng nhận của các máy thu riêng lẻ. Điều này có thể được sửa chữa bằng cách chạy nhiều máy thu nhiều hơn để tăng thông lượng tổng hợp. Ngoài ra, khuyến cáo rằng việc sao chép dữ liệu nhận được trong Spark sẽ bị vô hiệu hóa khi nhật ký ghi ghi được bật vì nhật ký đã được lưu trữ trong hệ thống lưu trữ được sao chép. Điều này có thể được thực hiện bằng cách đặt mức lưu trữ cho luồng đầu vào thành
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    35. Mặc dù sử dụng S3 (hoặc bất kỳ hệ thống tệp nào không hỗ trợ Flushing) cho nhật ký ghi lại, vui lòng nhớ bật
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    36 và
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    37. Xem cấu hình phát trực tuyến Spark để biết thêm chi tiết. Lưu ý rằng Spark sẽ không mã hóa dữ liệu được ghi vào nhật ký ghi lại khi mã hóa I/O được bật. Nếu mã hóa dữ liệu nhật ký ghi lại là mong muốn, nó sẽ được lưu trữ trong một hệ thống tệp hỗ trợ mã hóa nguyên bản.

  • Đặt tỷ lệ nhận tối đa - Nếu tài nguyên cụm không đủ lớn để ứng dụng phát trực tuyến xử lý dữ liệu nhanh như nhận được, người nhận có thể bị giới hạn tốc độ bằng cách đặt giới hạn tốc độ tối đa theo hồ sơ / giây. Xem các tham số cấu hình
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    38 cho người nhận và
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    39 cho phương pháp Kafka trực tiếp. Trong Spark 1.5, chúng tôi đã giới thiệu một tính năng có tên là Backpressure loại bỏ sự cần thiết phải đặt giới hạn tốc độ này, vì phát luồng Spark sẽ tự động tìm ra giới hạn tốc độ và điều chỉnh động chúng nếu điều kiện xử lý thay đổi. Áp lực này có thể được bật bằng cách đặt tham số cấu hình
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    40 thành
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    34.

Nâng cấp mã ứng dụng

Nếu một ứng dụng phát luồng Spark đang chạy cần được nâng cấp với mã ứng dụng mới, thì có hai cơ chế có thể.

  • Ứng dụng phát trực tuyến Spark được nâng cấp được bắt đầu và chạy song song với ứng dụng hiện có. Một khi dữ liệu mới (nhận được dữ liệu giống như dữ liệu cũ) đã được làm nóng và sẵn sàng cho thời gian chính, cái cũ có thể được đưa xuống. Lưu ý rằng điều này có thể được thực hiện cho các nguồn dữ liệu hỗ trợ gửi dữ liệu đến hai điểm đến (tức là, các ứng dụng đã được nâng cấp trước đó và nâng cấp).

  • Ứng dụng hiện tại được tắt một cách duyên dáng (xem

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    42 hoặc
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    43 để biết các tùy chọn tắt tiếng duyên dáng) để đảm bảo dữ liệu đã nhận được được xử lý hoàn toàn trước khi tắt. Sau đó, ứng dụng được nâng cấp có thể được bắt đầu, sẽ bắt đầu xử lý từ cùng một điểm mà ứng dụng trước đó đã tắt. Lưu ý rằng điều này chỉ có thể được thực hiện với các nguồn đầu vào hỗ trợ bộ đệm phía nguồn (như kafka) vì dữ liệu cần được đệm trong khi ứng dụng trước đó đã giảm và ứng dụng được nâng cấp vẫn chưa tăng. Và khởi động lại từ thông tin điểm kiểm tra trước đó của mã nâng cấp trước không thể được thực hiện. Thông tin điểm kiểm tra về cơ bản chứa các đối tượng Scala/Java/Python tuần tự và cố gắng khử các đối tượng với các lớp mới, được sửa đổi có thể dẫn đến lỗi. Trong trường hợp này, hoặc bắt đầu ứng dụng được nâng cấp với một thư mục điểm kiểm tra khác hoặc xóa thư mục điểm kiểm tra trước đó.


Giám sát các ứng dụng

Ngoài khả năng giám sát Spark Spark, có những khả năng bổ sung cụ thể cho phát trực tuyến Spark. Khi sử dụng StreamingContext, UI Web Spark hiển thị một tab

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
44 bổ sung hiển thị số liệu thống kê về máy thu chạy (cho dù máy thu đang hoạt động, số lượng hồ sơ nhận được, lỗi máy thu, v.v.) và hoàn thành các đợt (thời gian xử lý hàng loạt, chậm trễ, v.v. .). Điều này có thể được sử dụng để theo dõi tiến trình của ứng dụng phát trực tuyến.

Hai số liệu sau đây trong UI Web đặc biệt quan trọng:

  • Thời gian xử lý - Thời gian để xử lý từng lô dữ liệu.
  • Lập lịch chậm trễ - Thời gian một lô chờ đợi trong một hàng đợi để xử lý các lô trước đó để hoàn thành.

Nếu thời gian xử lý hàng loạt liên tục nhiều hơn khoảng thời gian hàng loạt và/hoặc độ trễ xếp hàng tiếp tục tăng, thì nó chỉ ra rằng hệ thống không thể xử lý các lô nhanh như chúng đang được tạo ra và bị tụt lại phía sau. Trong trường hợp đó, hãy xem xét giảm thời gian xử lý hàng loạt.

Tiến trình của chương trình phát trực tuyến tia lửa cũng có thể được theo dõi bằng giao diện StreamingListener, cho phép bạn có được trạng thái máy thu và thời gian xử lý. Lưu ý rằng đây là API của nhà phát triển và nó có khả năng được cải thiện (tức là, nhiều thông tin được báo cáo) trong tương lai.



Điều chỉnh hiệu suất

Nhận hiệu suất tốt nhất từ ​​một ứng dụng phát trực tuyến Spark trên một cụm đòi hỏi một chút điều chỉnh. Phần này giải thích một số tham số và cấu hình có thể được điều chỉnh để cải thiện hiệu suất của ứng dụng bạn. Ở cấp độ cao, bạn cần xem xét hai điều:

  1. Giảm thời gian xử lý của từng lô dữ liệu bằng cách sử dụng hiệu quả các tài nguyên cụm.

  2. Đặt đúng kích thước lô sao cho các lô dữ liệu có thể được xử lý nhanh như chúng được nhận (nghĩa là xử lý dữ liệu theo kịp với việc nhập dữ liệu).

Giảm thời gian xử lý hàng loạt

Có một số tối ưu hóa có thể được thực hiện trong Spark để giảm thiểu thời gian xử lý của mỗi lô. Những điều này đã được thảo luận chi tiết trong hướng dẫn điều chỉnh. Phần này nêu bật một số trong những phần quan trọng nhất.

Mức độ song song trong dữ liệu nhận

Nhận dữ liệu qua mạng (như kafka, ổ cắm, v.v.) yêu cầu dữ liệu phải được phân biệt hóa và lưu trữ trong Spark. Nếu việc nhận dữ liệu trở thành nút cổ chai trong hệ thống, thì hãy xem xét song song hóa việc nhận dữ liệu. Lưu ý rằng mỗi Dstream đầu vào tạo ra một máy thu duy nhất (chạy trên máy công nhân) nhận được một luồng dữ liệu duy nhất. Do đó, việc nhận nhiều luồng dữ liệu có thể đạt được bằng cách tạo nhiều DStream đầu vào và định cấu hình chúng để nhận các phân vùng khác nhau của luồng dữ liệu từ (các) nguồn. Ví dụ, một Dstream đầu vào Kafka duy nhất nhận được hai chủ đề dữ liệu có thể được chia thành hai luồng đầu vào Kafka, mỗi chủ đề chỉ nhận được một chủ đề. Điều này sẽ chạy hai máy thu, cho phép dữ liệu được nhận song song, do đó tăng thông lượng tổng thể. Nhiều DSTREAM này có thể được kết hợp với nhau để tạo ra một Dstream duy nhất. Sau đó, các phép biến đổi đang được áp dụng trên một Dstream đầu vào duy nhất có thể được áp dụng trên luồng thống nhất. Điều này được thực hiện như sau.

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
1

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
2

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
3

Một tham số khác nên được xem xét là khoảng thời gian khối của máy thu, được xác định bởi tham số cấu hình

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
45. Đối với hầu hết các máy thu, dữ liệu nhận được được kết hợp lại với nhau thành các khối dữ liệu trước khi lưu trữ bên trong bộ nhớ Spark Spark. Số lượng khối trong mỗi lô xác định số lượng tác vụ sẽ được sử dụng để xử lý dữ liệu nhận được trong một chuyển đổi giống như bản đồ. Số lượng nhiệm vụ trên mỗi máy thu trên mỗi lô sẽ xấp xỉ (khoảng thời gian / khoảng thời gian). Ví dụ: khoảng thời gian khối 200 ms sẽ tạo ra 10 nhiệm vụ cho mỗi 2 lô. Nếu số lượng tác vụ quá thấp (nghĩa là, ít hơn số lượng lõi trên mỗi máy), thì nó sẽ không hiệu quả vì tất cả các lõi có sẵn sẽ không được sử dụng để xử lý dữ liệu. Để tăng số lượng nhiệm vụ cho một khoảng thời gian hàng loạt nhất định, hãy giảm khoảng thời gian khối. Tuy nhiên, giá trị tối thiểu được đề xuất của khoảng thời gian khối là khoảng 50 ms, dưới đó chi phí khởi động nhiệm vụ có thể là một vấn đề.

Một giải pháp thay thế cho việc nhận dữ liệu với nhiều luồng / máy thu đầu vào là để sửa lại một cách rõ ràng luồng dữ liệu đầu vào (sử dụng

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
46). Điều này phân phối các lô dữ liệu nhận được trên số lượng máy được chỉ định trong cụm trước khi xử lý thêm.

Để biết luồng trực tiếp, vui lòng tham khảo SPARK Streaming + Kafka Hướng dẫn tích hợp

Mức độ song song trong xử lý dữ liệu

Tài nguyên cụm có thể được sử dụng dưới mức nếu số lượng các tác vụ song song được sử dụng trong bất kỳ giai đoạn nào của tính toán không đủ cao. Ví dụ: đối với các hoạt động giảm phân tán như

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
87 và
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
90, số lượng tác vụ song song mặc định được kiểm soát bởi thuộc tính cấu hình
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
64. Bạn có thể vượt qua mức độ song song dưới dạng đối số (xem tài liệu
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
50) hoặc đặt thuộc tính cấu hình
set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
64 để thay đổi mặc định.

Tuần tự hóa dữ liệu

Các chi phí của tuần tự hóa dữ liệu có thể được giảm bằng cách điều chỉnh các định dạng tuần tự hóa. Trong trường hợp phát trực tuyến, có hai loại dữ liệu đang được tuần tự hóa.

  • Dữ liệu đầu vào: Theo mặc định, dữ liệu đầu vào nhận được thông qua các máy thu được lưu trữ trong bộ nhớ của người thực thi với Storagelevel.Memory_and_Disk_Ser_2. Đó là, dữ liệu được tuần tự hóa thành byte để giảm chi phí GC và được sao chép để dung nạp các lỗi thực thi. Ngoài ra, dữ liệu được giữ đầu tiên trong bộ nhớ và chỉ được đổ vào đĩa nếu bộ nhớ không đủ để chứa tất cả các dữ liệu đầu vào cần thiết cho tính toán phát trực tuyến. Việc tuần tự hóa này rõ ràng có chi phí-người nhận phải giảm dần dữ liệu nhận được và tiếp nối lại nó bằng định dạng tuần tự hóa Spark Spark.: By default, the input data received through Receivers is stored in the executors’ memory with StorageLevel.MEMORY_AND_DISK_SER_2. That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads – the receiver must deserialize the received data and re-serialize it using Spark’s serialization format.

  • Các RDD tồn tại được tạo ra bởi các hoạt động phát trực tuyến: RDD được tạo bởi các tính toán phát trực tuyến có thể được tồn tại trong bộ nhớ. Ví dụ, các hoạt động cửa sổ vẫn tồn tại dữ liệu trong bộ nhớ vì chúng sẽ được xử lý nhiều lần. Tuy nhiên, không giống như mặc định lõi Spark của storagelevel.memory_only, các RDD tồn tại được tạo ra bởi các tính toán phát trực tuyến được duy trì với storagelevel.memory_only_ser (tức là mặc định) theo mặc định để giảm thiểu chi phí GC.: RDDs generated by streaming computations may be persisted in memory. For example, window operations persist data in memory as they would be processed multiple times. However, unlike the Spark Core default of StorageLevel.MEMORY_ONLY, persisted RDDs generated by streaming computations are persisted with StorageLevel.MEMORY_ONLY_SER (i.e. serialized) by default to minimize GC overheads.

Trong cả hai trường hợp, sử dụng tuần tự hóa Kryo có thể làm giảm cả chi phí CPU và bộ nhớ. Xem Hướng dẫn điều chỉnh tia lửa để biết thêm chi tiết. Đối với Kryo, hãy xem xét đăng ký các lớp tùy chỉnh và vô hiệu hóa theo dõi tham chiếu đối tượng (xem các cấu hình liên quan đến Kryo trong hướng dẫn cấu hình).

Trong các trường hợp cụ thể mà lượng dữ liệu cần được giữ lại cho ứng dụng phát trực tuyến không lớn, có thể khả thi đối với dữ liệu tồn tại (cả hai loại) như các đối tượng bị khử khử mà không phải chịu các chi phí GC quá mức. Ví dụ: nếu bạn đang sử dụng các khoảng thời gian hàng loạt trong vài giây và không có hoạt động cửa sổ nào, thì bạn có thể thử vô hiệu hóa tuần tự hóa trong dữ liệu tồn tại bằng cách thiết lập rõ ràng mức lưu trữ cho phù hợp. Điều này sẽ làm giảm chi phí CPU do tuần tự hóa, có khả năng cải thiện hiệu suất mà không cần quá nhiều chi phí GC.

Nhiệm vụ khởi động chi phí

Nếu số lượng tác vụ được khởi chạy mỗi giây là cao (giả sử, 50 trở lên mỗi giây), thì chi phí gửi các nhiệm vụ cho người thực thi có thể rất đáng kể và sẽ khiến bạn khó đạt được độ trễ thứ hai. Chi phí có thể được giảm bằng các thay đổi sau:

  • Chế độ thực thi: Chạy Spark ở chế độ độc lập hoặc chế độ mesos hạt thô dẫn đến thời gian khởi chạy tác vụ tốt hơn so với chế độ mesos hạt mịn. Vui lòng tham khảo Hướng dẫn chạy trên Mesos để biết thêm chi tiết.: Running Spark in Standalone mode or coarse-grained Mesos mode leads to better task launch times than the fine-grained Mesos mode. Please refer to the Running on Mesos guide for more details.

Những thay đổi này có thể làm giảm thời gian xử lý hàng loạt xuống 100 mili giây, do đó cho phép kích thước lô dưới giây có thể khả thi.


Đặt đúng khoảng thời gian đợt

Đối với một ứng dụng phát trực tuyến Spark chạy trên một cụm để ổn định, hệ thống sẽ có thể xử lý dữ liệu nhanh như nhận được. Nói cách khác, các lô dữ liệu nên được xử lý nhanh như chúng đang được tạo. Liệu điều này có đúng với một ứng dụng có thể được tìm thấy bằng cách theo dõi thời gian xử lý trong giao diện người dùng web phát trực tuyến hay không, trong đó thời gian xử lý hàng loạt sẽ nhỏ hơn khoảng thời gian hàng loạt.

Tùy thuộc vào bản chất của tính toán phát trực tuyến, khoảng thời gian hàng loạt được sử dụng có thể có tác động đáng kể đến tốc độ dữ liệu có thể được duy trì bởi ứng dụng trên một bộ tài nguyên cụm cố định. Ví dụ: chúng ta hãy xem xét ví dụ WordCountNetwork trước đó. Đối với một tốc độ dữ liệu cụ thể, hệ thống có thể theo kịp các từ báo cáo cứ sau 2 giây (tức là, khoảng thời gian hàng loạt là 2 giây), nhưng không phải cứ sau 500 mili giây. Vì vậy, khoảng thời gian hàng loạt cần được đặt sao cho tốc độ dữ liệu dự kiến ​​trong sản xuất có thể được duy trì.

Một cách tiếp cận tốt để tìm ra kích thước lô phù hợp cho ứng dụng của bạn là kiểm tra nó với khoảng thời gian hàng loạt bảo thủ (giả sử, 5-10 giây) và tốc độ dữ liệu thấp. Để xác minh xem hệ thống có thể theo kịp tốc độ dữ liệu hay không, bạn có thể kiểm tra giá trị của độ trễ từ đầu đến cuối được trải nghiệm bởi từng lô đã xử lý (tìm kiếm tổng số độ trễ của Google Giao diện StreamingListener). Nếu độ trễ được duy trì để so sánh với kích thước lô, thì hệ thống vẫn ổn định. Mặt khác, nếu độ trễ liên tục tăng, điều đó có nghĩa là hệ thống không thể theo kịp và do đó nó không ổn định. Khi bạn có ý tưởng về cấu hình ổn định, bạn có thể thử tăng tốc độ dữ liệu và/hoặc giảm kích thước lô. Lưu ý rằng sự gia tăng nhất thời của độ trễ do tăng tốc độ dữ liệu tạm thời có thể ổn miễn là độ trễ giảm trở lại giá trị thấp (nghĩa là, nhỏ hơn kích thước lô).


Điều chỉnh bộ nhớ

Điều chỉnh việc sử dụng bộ nhớ và hành vi GC của các ứng dụng Spark đã được thảo luận rất chi tiết trong hướng dẫn điều chỉnh. Nó được khuyến nghị mạnh mẽ rằng bạn đọc nó. Trong phần này, chúng tôi thảo luận về một vài tham số điều chỉnh cụ thể trong bối cảnh của các ứng dụng phát trực tuyến tia lửa.

Lượng bộ nhớ cụm được yêu cầu bởi một ứng dụng phát tia tia lửa phụ thuộc rất nhiều vào loại biến đổi được sử dụng. Ví dụ: nếu bạn muốn sử dụng thao tác cửa sổ trong 10 phút cuối cùng của dữ liệu, thì cụm của bạn sẽ có đủ bộ nhớ để chứa dữ liệu trị giá 10 phút trong bộ nhớ. Hoặc nếu bạn muốn sử dụng

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
66 với một số lượng lớn các phím, thì bộ nhớ cần thiết sẽ cao. Ngược lại, nếu bạn muốn thực hiện một thao tác lọc bản đồ đơn giản, thì bộ nhớ cần thiết sẽ thấp.

Nói chung, vì dữ liệu nhận được thông qua các máy thu được lưu trữ với storagelevel.memory_and_disk_ser_2, dữ liệu không phù hợp với bộ nhớ sẽ tràn vào đĩa. Điều này có thể làm giảm hiệu suất của ứng dụng phát trực tuyến và do đó nên cung cấp đủ bộ nhớ theo yêu cầu của ứng dụng phát trực tuyến của bạn. Tốt nhất là thử và xem việc sử dụng bộ nhớ ở quy mô nhỏ và ước tính tương ứng.

Một khía cạnh khác của điều chỉnh bộ nhớ là bộ sưu tập rác. Đối với một ứng dụng phát trực tuyến yêu cầu độ trễ thấp, không mong muốn có các tạm dừng lớn do bộ sưu tập rác JVM gây ra.

Có một vài tham số có thể giúp bạn điều chỉnh việc sử dụng bộ nhớ và chi phí GC:

  • Mức độ bền bỉ của DSTREAM: Như đã đề cập trước đó trong phần tuần tự hóa dữ liệu, dữ liệu đầu vào và RDD theo mặc định vẫn tồn tại khi các byte tuần tự hóa. Điều này làm giảm cả việc sử dụng bộ nhớ và chi phí GC, so với sự kiên trì bị khử. Kích hoạt tuần tự hóa Kryo làm giảm thêm kích thước nối tiếp và sử dụng bộ nhớ. Giảm thêm sử dụng bộ nhớ có thể đạt được khi nén (xem cấu hình tia lửa

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    53), với chi phí của thời gian CPU.: As mentioned earlier in the Data Serialization section, the input data and RDDs are by default persisted as serialized bytes. This reduces both the memory usage and GC overheads, compared to deserialized persistence. Enabling Kryo serialization further reduces serialized sizes and memory usage. Further reduction in memory usage can be achieved with compression (see the Spark configuration
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    53), at the cost of CPU time.

  • Xóa dữ liệu cũ: Theo mặc định, tất cả dữ liệu đầu vào và các RDD được tạo ra được tạo bởi các phép biến đổi Dstream đều được xóa tự động. Phát trực tuyến Spark quyết định khi nào nên xóa dữ liệu dựa trên các phép biến đổi được sử dụng. Ví dụ: nếu bạn đang sử dụng hoạt động cửa sổ 10 phút, thì Spark Streaming sẽ giữ khoảng 10 phút cuối cùng của dữ liệu và chủ động vứt bỏ dữ liệu cũ hơn. Dữ liệu có thể được giữ lại trong một thời gian dài hơn (ví dụ: truy vấn tương tác dữ liệu cũ hơn) bằng cách đặt

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    54.: By default, all input data and persisted RDDs generated by DStream transformations are automatically cleared. Spark Streaming decides when to clear the data based on the transformations that are used. For example, if you are using a window operation of 10 minutes, then Spark Streaming will keep around the last 10 minutes of data, and actively throw away older data. Data can be retained for a longer duration (e.g. interactively querying older data) by setting
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    54.

  • Trình thu thập rác CMS: Sử dụng GC đánh dấu và quét đồng thời được khuyến nghị mạnh mẽ để giữ các tạm dừng liên quan đến GC luôn thấp. Mặc dù GC đồng thời được biết là giảm thông lượng xử lý tổng thể của hệ thống, việc sử dụng nó vẫn được khuyến nghị để đạt được thời gian xử lý lô phù hợp hơn. Đảm bảo bạn đặt CMS GC trên cả trình điều khiển (sử dụng

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    55 trong
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    27) và người thực thi (sử dụng cấu hình Spark
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    57).
    : Use of the concurrent mark-and-sweep GC is strongly recommended for keeping GC-related pauses consistently low. Even though concurrent GC is known to reduce the overall processing throughput of the system, its use is still recommended to achieve more consistent batch processing times. Make sure you set the CMS GC on both the driver (using
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    55 in
    set 1.9240000247955322
    list 8.82200002670288
    tuple 7.031999826431274
    
    27) and the executors (using Spark configuration
    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    57).

  • Các mẹo khác: Để giảm thêm chi phí GC, đây là một số mẹo khác để thử.: To further reduce GC overheads, here are some more tips to try.

    • RDD tồn tại bằng cách sử dụng mức lưu trữ
      import time
      
      def testFilter ( n, test, rangeSize ):
          for method in ( set, list, tuple ):
              t = time.time()
              for i in range( n ):
                  method( filter( test, range( rangeSize ) ) )
              print( method.__name__, ( time.time() - t ) )
      
      testFilter( 100000, lambda x: x % 3 == 0, 1000 )
      
      58. Xem thêm chi tiết trong Hướng dẫn lập trình tia lửa.
    • Sử dụng nhiều người thực thi hơn với kích thước đống nhỏ hơn. Điều này sẽ làm giảm áp suất GC trong mỗi đống JVM.

Những điểm quan trọng cần nhớ:
  • Một dstream được liên kết với một máy thu đơn. Để đạt được đọc song song, nhiều máy thu, tức là nhiều dstream cần được tạo. Một người nhận được chạy trong một người thực thi. Nó chiếm một lõi. Đảm bảo rằng có đủ các lõi để xử lý sau khi các khe máy thu được đặt, tức là

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    59 nên tính đến các khe cắm máy thu. Các máy thu được phân bổ cho người điều hành theo kiểu Robin tròn.

  • Khi dữ liệu được nhận từ nguồn luồng, người nhận sẽ tạo các khối dữ liệu. Một khối dữ liệu mới được tạo ra mỗi mili giây. N khối dữ liệu được tạo trong BatchInterVal trong đó n = BatchInterval/blockInterval. Các khối này được phân phối bởi người quản lý blockmanager của người thực thi hiện tại cho các nhà quản lý khối của các nhà điều hành khác. Sau đó, trình theo dõi đầu vào mạng chạy trên trình điều khiển được thông báo về các vị trí khối để xử lý thêm.

  • Một RDD được tạo trên trình điều khiển cho các khối được tạo trong BatchInterVal. Các khối được tạo ra trong BatchInterVal là các phân vùng của RDD. Mỗi phân vùng là một nhiệm vụ trong Spark. BlockInterval == BatchInterVal có nghĩa là một phân vùng duy nhất được tạo và có lẽ nó được xử lý cục bộ.

  • Các tác vụ bản đồ trên các khối được xử lý trong các thực thi (một tác phẩm đã nhận được khối và một trong đó khối được sao chép) có các khối không phân biệt khoảng thời gian khối, trừ khi lập lịch không địa phương khởi động. Có khối khối lớn hơn có nghĩa là các khối lớn hơn. Giá trị cao

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    60 làm tăng cơ hội xử lý một khối trên nút cục bộ. Một sự cân bằng cần được tìm thấy giữa hai tham số này để đảm bảo rằng các khối lớn hơn được xử lý cục bộ.

  • Thay vì dựa vào BatchInterVal và BlockInterVal, bạn có thể xác định số lượng phân vùng bằng cách gọi

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    61. Điều này kể lại dữ liệu trong RDD một cách ngẫu nhiên để tạo n số phân vùng. Vâng, cho sự song song lớn hơn. Mặc dù đến với chi phí của một sự xáo trộn. Một quá trình xử lý RDD được lên lịch bởi người lái xe công việc như một công việc. Tại một thời điểm nhất định, chỉ có một công việc hoạt động. Vì vậy, nếu một công việc đang thực hiện các công việc khác được xếp hàng.

  • Nếu bạn có hai DSTREAM sẽ có hai RDD được hình thành và sẽ có hai công việc được tạo ra sẽ được lên lịch sau một công việc khác. Để tránh điều này, bạn có thể kết hợp hai dstreams. Điều này sẽ đảm bảo rằng một công đoàn duy nhất được hình thành cho hai RDD của Dstreams. Liên minh này sau đó được coi là một công việc duy nhất. Tuy nhiên, việc phân vùng RDD không bị ảnh hưởng.

  • Nếu thời gian xử lý hàng loạt nhiều hơn BatchInterVal thì rõ ràng bộ nhớ của máy thu sẽ bắt đầu lấp đầy và sẽ kết thúc các ngoại lệ ném (có thể là blocknotfoundException). Hiện tại, không có cách nào để tạm dừng người nhận. Sử dụng cấu hình SparkConf

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    38, tốc độ thu có thể bị hạn chế.



Ngữ nghĩa chịu lỗi

Trong phần này, chúng tôi sẽ thảo luận về hành vi của các ứng dụng phát trực tuyến Spark trong trường hợp thất bại.

Tiểu sử

Để hiểu các ngữ nghĩa được cung cấp bởi Spark Streaming, chúng ta hãy nhớ ngữ nghĩa khả năng dung nạp lỗi cơ bản của RDD Spark.

  1. Một RDD là một bộ dữ liệu phân tán, có thể tính toán lại, phân tán. Mỗi RDD nhớ các dòng hoạt động xác định được sử dụng trên bộ dữ liệu đầu vào chịu lỗi để tạo nó.
  2. Nếu bất kỳ phân vùng nào của RDD bị mất do lỗi nút công nhân, thì phân vùng đó có thể được tính toán lại từ bộ dữ liệu chịu lỗi ban đầu bằng cách sử dụng dòng hoạt động.
  3. Giả sử rằng tất cả các phép biến đổi RDD đều có tính xác định, dữ liệu trong RDD được chuyển đổi cuối cùng sẽ luôn giống nhau bất kể các lỗi trong cụm tia lửa.

Spark hoạt động trên dữ liệu trong các hệ thống tệp chịu lỗi như HDFS hoặc S3. Do đó, tất cả các RDD được tạo từ dữ liệu chịu lỗi cũng chịu lỗi. Tuy nhiên, đây không phải là trường hợp phát trực tuyến tia lửa vì dữ liệu trong hầu hết các trường hợp được nhận qua mạng (trừ khi

set 1.9240000247955322
list 8.82200002670288
tuple 7.031999826431274
54 được sử dụng). Để đạt được các thuộc tính chịu lỗi tương tự cho tất cả các RDD được tạo, dữ liệu nhận được được sao chép giữa nhiều trình thực thi Spark trong các nút công nhân trong cụm (hệ số sao chép mặc định là 2). Điều này dẫn đến hai loại dữ liệu trong hệ thống cần phục hồi trong trường hợp thất bại:

  1. Dữ liệu nhận được và sao chép - Dữ liệu này tồn tại thất bại của một nút công nhân như một bản sao của nó tồn tại trên một trong các nút khác.
  2. Dữ liệu nhận được nhưng được đệm để sao chép - vì điều này không được sao chép, cách duy nhất để khôi phục dữ liệu này là lấy lại từ nguồn.

Hơn nữa, có hai loại thất bại mà chúng ta nên quan tâm:

  1. Thất bại của một nút công nhân - Bất kỳ nút công nhân nào chạy các trình điều hành đều có thể thất bại và tất cả dữ liệu trong bộ nhớ trên các nút đó sẽ bị mất. Nếu bất kỳ người nhận nào đang chạy trên các nút không thành công, thì dữ liệu được đệm của chúng sẽ bị mất.
  2. Thất bại của nút Trình điều khiển - Nếu nút trình điều khiển chạy ứng dụng phát luồng Spark không thành công, thì rõ ràng SparkContext bị mất và tất cả các Trình thực hiện với dữ liệu trong bộ nhớ của họ đều bị mất.

Với kiến ​​thức cơ bản này, hãy để chúng tôi hiểu ngữ nghĩa chịu lỗi của phát trực tuyến tia lửa.

Định nghĩa

Các ngữ nghĩa của các hệ thống phát trực tuyến thường được nắm bắt về số lần mỗi bản ghi có thể được xử lý bởi hệ thống. Có ba loại đảm bảo rằng một hệ thống có thể cung cấp trong tất cả các điều kiện hoạt động có thể (mặc dù thất bại, v.v.)

  1. Nhiều nhất một lần: mỗi bản ghi sẽ được xử lý một lần hoặc không được xử lý.
  2. Ít nhất một lần: mỗi bản ghi sẽ được xử lý một hoặc nhiều lần. Điều này mạnh hơn AT-MOST một lần vì nó đảm bảo rằng không có dữ liệu nào bị mất. Nhưng có thể có các bản sao.
  3. Chính xác một lần: Mỗi bản ghi sẽ được xử lý chính xác một lần - không có dữ liệu nào bị mất và không có dữ liệu nào được xử lý nhiều lần. Đây rõ ràng là sự đảm bảo mạnh mẽ nhất của ba.

Ngữ nghĩa cơ bản

Trong bất kỳ hệ thống xử lý luồng nào, nói rộng ra, có ba bước trong việc xử lý dữ liệu.

  1. Nhận dữ liệu: Dữ liệu được nhận từ các nguồn sử dụng máy thu hoặc cách khác.

  2. Chuyển đổi dữ liệu: Dữ liệu nhận được được chuyển đổi bằng cách sử dụng các phép biến đổi DSTREAM và RDD.

  3. Đẩy ra dữ liệu: Dữ liệu được chuyển đổi cuối cùng được đẩy ra các hệ thống bên ngoài như hệ thống tệp, cơ sở dữ liệu, bảng điều khiển, v.v.

Nếu một ứng dụng phát trực tuyến phải đạt được các bảo đảm chính xác từ đầu đến cuối, thì mỗi bước phải cung cấp bảo đảm chính xác. Đó là, mỗi bản ghi phải được nhận chính xác một lần, được chuyển đổi chính xác một lần và được đẩy vào các hệ thống hạ nguồn chính xác một lần. Hãy để hiểu về ngữ nghĩa của các bước này trong bối cảnh phát trực tuyến tia lửa.

  1. Nhận dữ liệu: Các nguồn đầu vào khác nhau cung cấp các đảm bảo khác nhau. Điều này được thảo luận chi tiết trong tiểu mục tiếp theo.

  2. Chuyển đổi dữ liệu: Tất cả dữ liệu đã nhận được sẽ được xử lý chính xác một lần, nhờ các đảm bảo mà RDD cung cấp. Ngay cả khi có những thất bại, miễn là dữ liệu đầu vào nhận được có thể truy cập được, các RDD được chuyển đổi cuối cùng sẽ luôn có cùng nội dung.

  3. Đẩy ra dữ liệu: hoạt động đầu ra theo mặc định đảm bảo tại địa chỉ một lần ngữ nghĩa vì nó phụ thuộc vào loại hoạt động đầu ra (idempotent, hoặc không) và ngữ nghĩa của hệ thống hạ nguồn (hỗ trợ giao dịch hoặc không). Nhưng người dùng có thể thực hiện các cơ chế giao dịch của riêng họ để đạt được ngữ nghĩa chính xác. Điều này được thảo luận chi tiết hơn sau này trong phần.

Ngữ nghĩa của dữ liệu nhận được

Các nguồn đầu vào khác nhau cung cấp các đảm bảo khác nhau, từ At-least một lần đến chính xác một lần. Đọc để biết thêm chi tiết.

Với các tập tin

Nếu tất cả các dữ liệu đầu vào đã có trong một hệ thống tệp chịu lỗi như HDF, việc phát luồng Spark luôn có thể khôi phục sau mọi lỗi và xử lý tất cả dữ liệu. Điều này cung cấp các ngữ nghĩa chính xác, có nghĩa là tất cả các dữ liệu sẽ được xử lý chính xác một khi không có vấn đề gì thất bại.

Với các nguồn dựa trên người nhận

Đối với các nguồn đầu vào dựa trên máy thu, ngữ nghĩa khả năng chịu lỗi phụ thuộc vào cả kịch bản thất bại và loại máy thu. Như chúng ta đã thảo luận trước đó, có hai loại máy thu:

  1. Máy thu đáng tin cậy - Những người nhận này chỉ thừa nhận các nguồn đáng tin cậy sau khi đảm bảo rằng dữ liệu nhận được đã được sao chép. Nếu một người nhận như vậy không thành công, nguồn sẽ không nhận được xác nhận cho dữ liệu được đệm (không thể thay thế). Do đó, nếu máy thu được khởi động lại, nguồn sẽ gửi lại dữ liệu và không có dữ liệu nào bị mất do lỗi.
  2. Người nhận không đáng tin cậy - Những người nhận như vậy không gửi xác nhận và do đó có thể mất dữ liệu khi họ thất bại do thất bại của công nhân hoặc người lái xe.

Tùy thuộc vào loại máy thu được sử dụng, chúng tôi đạt được ngữ nghĩa sau. Nếu một nút công nhân không thành công, thì không có mất dữ liệu với các máy thu đáng tin cậy. Với những người nhận không đáng tin cậy, dữ liệu nhận được nhưng không được sao chép có thể bị mất. Nếu nút trình điều khiển không thành công, thì bên cạnh những tổn thất này, tất cả các dữ liệu trong quá khứ đã được nhận và nhân rộng trong bộ nhớ sẽ bị mất. Điều này sẽ ảnh hưởng đến kết quả của các biến đổi trạng thái.

Để tránh mất dữ liệu nhận được trong quá khứ này, Spark 1.2 đã giới thiệu các bản ghi ghi trước để lưu dữ liệu nhận được vào lưu trữ chịu lỗi. Với các nhật ký ghi lại được bật và máy thu đáng tin cậy, không mất dữ liệu. Về mặt ngữ nghĩa, nó cung cấp một sự đảm bảo một lần.

Bảng sau đây tóm tắt các ngữ nghĩa trong các thất bại:

Kịch bản triển khaiCông nhân thất bạiThất bại lái xe
Spark 1.1 hoặc sớm hơn, hoặc Spark 1.2 trở lên mà không cần ghi lại ghi lại
Spark 1.2 or later without write-ahead logs
Dữ liệu được đệm bị mất với các máy thu không đáng tin cậy mất dữ liệu với các máy thu đáng tin cậy tại địa điểm
Zero data loss with reliable receivers
At-least once semantics
Dữ liệu được đệm bị mất với người nhận không đáng tin cậy dữ liệu quá khứ bị mất với tất cả các bộ nhận không xác định ngữ nghĩa
Past data lost with all receivers
Undefined semantics
Spark 1.2 trở lên với nhật ký ghi lại Mất dữ liệu bằng không với các máy thu đáng tin cậy tại địa điểm
At-least once semantics
Mất dữ liệu bằng không với các máy thu và tệp đáng tin cậy
At-least once semantics

Với API trực tiếp Kafka

Trong Spark 1.3, chúng tôi đã giới thiệu API trực tiếp KAFKA mới, có thể đảm bảo rằng tất cả dữ liệu Kafka được nhận bằng cách phát trực tuyến Spark chính xác một lần. Cùng với điều này, nếu bạn thực hiện hoạt động đầu ra chính xác, bạn có thể đạt được các bảo đảm chính xác từ đầu đến cuối. Cách tiếp cận này được thảo luận thêm trong Hướng dẫn tích hợp Kafka.

Ngữ nghĩa của hoạt động đầu ra

Các hoạt động đầu ra (như

import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
64) có tại địa chỉ một khi ngữ nghĩa, nghĩa là dữ liệu được chuyển đổi có thể được ghi vào một thực thể bên ngoài nhiều lần trong trường hợp thất bại của công nhân. Mặc dù điều này có thể chấp nhận để lưu vào các hệ thống tệp bằng cách sử dụng các hoạt động
import time

def testFilter ( n, test, rangeSize ):
    for method in ( set, list, tuple ):
        t = time.time()
        for i in range( n ):
            method( filter( test, range( rangeSize ) ) )
        print( method.__name__, ( time.time() - t ) )

testFilter( 100000, lambda x: x % 3 == 0, 1000 )
65 (vì tệp sẽ đơn giản được ghi đè bằng cùng một dữ liệu), nhưng nỗ lực bổ sung có thể cần thiết để đạt được ngữ nghĩa chính xác. Có hai cách tiếp cận.

  • Cập nhật IDEMPOTENT: Nhiều lần thử luôn ghi cùng một dữ liệu. Ví dụ:

    import time
    
    def testFilter ( n, test, rangeSize ):
        for method in ( set, list, tuple ):
            t = time.time()
            for i in range( n ):
                method( filter( test, range( rangeSize ) ) )
            print( method.__name__, ( time.time() - t ) )
    
    testFilter( 100000, lambda x: x % 3 == 0, 1000 )
    
    65 luôn ghi cùng một dữ liệu vào các tệp được tạo.

  • Cập nhật giao dịch: Tất cả các bản cập nhật được thực hiện giao dịch để các bản cập nhật được thực hiện chính xác một lần về mặt nguyên tử. Một cách để làm điều này sẽ là như sau.

    • Sử dụng thời gian hàng loạt (có sẵn trong
      import time
      
      def testFilter ( n, test, rangeSize ):
          for method in ( set, list, tuple ):
              t = time.time()
              for i in range( n ):
                  method( filter( test, range( rangeSize ) ) )
              print( method.__name__, ( time.time() - t ) )
      
      testFilter( 100000, lambda x: x % 3 == 0, 1000 )
      
      64) và chỉ mục phân vùng của RDD để tạo một định danh. Định danh này xác định duy nhất một dữ liệu blob trong ứng dụng phát trực tuyến.
    • Cập nhật hệ thống bên ngoài với Blob này giao dịch (nghĩa là chính xác một lần, về mặt nguyên tử) bằng cách sử dụng định danh. Đó là, nếu định danh chưa được cam kết, hãy cam kết dữ liệu phân vùng và định danh về mặt nguyên tử. Khác, nếu điều này đã được cam kết, hãy bỏ qua bản cập nhật.

      ssc.start()             // Start the computation
      ssc.awaitTermination()  // Wait for the computation to terminate
      4



Đi đâu từ đây

  • Hướng dẫn bổ sung
    • Hướng dẫn tích hợp Kafka
    • Hướng dẫn tích hợp Kinesis
    • Hướng dẫn nhận tùy chỉnh
  • Nguồn dữ liệu Dstream của bên thứ ba có thể được tìm thấy trong các dự án của bên thứ ba
  • Tài liệu API
    • Scala Docs
      • StreamingContext và Dstream
      • Kafkautils, Kinesisutils,
    • Tài liệu Java
      • Javastreamingcontext, javadstream và javapairdstream
      • Kafkautils, Kinesisutils
    • Tài liệu Python
      • StreamingContext và Dstream
  • Kafkautils, Kinesisutils,
  • Tài liệu Java