Transformasi Stateful Kumulatif Dalam Streaming Apache Spark



Entri blog ini membahas transformasi stateful di Spark Streaming. Pelajari semua tentang pelacakan kumulatif dan peningkatan keterampilan untuk karier Hadoop Spark.

Kontribusi Prithviraj Bose

Di blog saya sebelumnya, saya telah membahas transformasi stateful menggunakan konsep windowing dari Apache Spark Streaming. Anda bisa membacanya sini .





Dalam posting ini saya akan membahas operasi stateful kumulatif di Apache Spark Streaming. Jika Anda baru mengenal Spark Streaming, saya sangat menyarankan Anda untuk membaca blog saya sebelumnya untuk memahami cara kerja windowing.

Jenis Transformasi Stateful dalam Spark Streaming (Lanjutan…)

> Pelacakan kumulatif

Kami telah menggunakan reduceByKeyAndWindow (…) API untuk melacak status kunci, namun windowing menimbulkan batasan untuk kasus penggunaan tertentu. Bagaimana jika kita ingin mengakumulasikan status kunci secara keseluruhan daripada membatasinya ke jendela waktu? Dalam hal ini kita perlu menggunakan updateStateByKey (…) API.



API ini diperkenalkan di Spark 1.3.0 dan sangat populer. Namun, API ini memiliki beberapa kelebihan kinerja, kinerjanya menurun seiring dengan meningkatnya ukuran status dari waktu ke waktu. Saya telah menulis contoh untuk menunjukkan penggunaan API ini. Anda dapat menemukan kodenya sini .

Spark 1.6.0 memperkenalkan API baru mapWithState (…) yang memecahkan overhead kinerja yang ditimbulkan updateStateByKey (…) . Di blog ini saya akan membahas API khusus ini menggunakan contoh program yang telah saya tulis. Anda dapat menemukan kodenya sini .

program java untuk terhubung ke database mysql

Sebelum saya menyelami panduan kode, mari kita luangkan beberapa kata tentang pos pemeriksaan. Untuk transformasi stateful apa pun, checkpointing adalah wajib. Checkpointing adalah mekanisme untuk memulihkan status kunci jika program driver gagal. Ketika driver restart, status kunci dipulihkan dari file pos pemeriksaan. Lokasi pos pemeriksaan biasanya HDFS atau Amazon S3 atau penyimpanan yang andal lainnya. Saat menguji kode, seseorang juga dapat menyimpan di sistem file lokal.



Dalam program contoh, kami mendengarkan aliran teks soket pada host = localhost dan port = 9999. Ini memberi token aliran masuk ke dalam (kata, jumlah kejadian) dan melacak jumlah kata menggunakan API 1.6.0 mapWithState (…) . Selain itu, kunci tanpa pembaruan akan dihapus menggunakan StateSpec.timeout API. Kami checkpointing di HDFS dan frekuensi checkpointing setiap 20 detik.

Pertama-tama, buat sesi Spark Streaming,

Spark-streaming-session

Kami membuat checkpointDir di HDFS dan kemudian panggil metode objek getOrCreate (…) . Itu getOrCreate API memeriksa checkpointDir untuk melihat apakah ada status sebelumnya untuk dipulihkan, jika itu ada, maka sesi Spark Streaming akan dibuat ulang dan memperbarui status kunci dari data yang disimpan dalam file sebelum melanjutkan dengan data baru. Jika tidak, itu membuat sesi Streaming Spark baru.

Itu getOrCreate mengambil nama direktori pos pemeriksaan dan fungsi (yang telah kami beri nama createFunc ) yang tanda tangannya harus () => StreamingContext .

Mari kita periksa kode di dalamnya createFunc .

Baris # 2: Kami membuat konteks streaming dengan nama pekerjaan ke 'TestMapWithStateJob' dan interval batch = 5 detik.

Baris # 5: Setel direktori pos pemeriksaan.

membuat kelas tunggal di java

Baris # 8: Setel spesifikasi negara bagian menggunakan kelas org.apache.streaming.StateSpec obyek. Pertama-tama kita mengatur fungsi yang akan melacak status, lalu kita mengatur jumlah partisi untuk DStream yang dihasilkan yang akan dihasilkan selama transformasi berikutnya. Akhirnya kami mengatur batas waktu (ke 30 detik) di mana jika ada pembaruan untuk kunci tidak diterima dalam 30 detik maka status kunci akan dihapus.

Baris 12 #: Siapkan aliran soket, ratakan data batch yang masuk, buat pasangan nilai kunci, panggil mapWithState , atur interval pemeriksaan ke 20-an dan terakhir cetak hasilnya.

Kerangka kerja Spark memanggil th e createFunc untuk setiap kunci dengan nilai sebelumnya dan status saat ini. Kami menghitung jumlah dan memperbarui status dengan jumlah kumulatif dan akhirnya kami mengembalikan jumlah untuk kunci tersebut.

c ++ mengurutkan angka dalam urutan menaik

Sumber Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Ada pertanyaan untuk kami? Harap sebutkan di bagian komentar dan kami akan menghubungi Anda kembali.

Posting terkait:

Mulailah dengan Apache Spark & ​​Scala

Transformasi Stateful dengan Windowing di Spark Streaming