DBLog - kerangka kerja umum untuk Ubah Pengambilan Data

Halo semuanya! Kami menawarkan Anda untuk membaca terjemahan artikel, yang kami siapkan khusus untuk siswa dari kursus "Arsitek Beban Tinggi" .




pengantar


Melacak perubahan data (Change Data Capture, CDC) memungkinkan Anda untuk menerima secara real time perubahan yang dilakukan dalam database dan mendistribusikannya ke berbagai konsumen [1] [2]. CDC menjadi semakin populer ketika sinkronisasi antara data warehousing heterogen diperlukan (misalnya, MySQL dan ElasticSearch) dan merupakan alternatif untuk metode tradisional seperti dual-tulis dan transaksi terdistribusi [3] [4].

Sumber untuk CDC, dalam database seperti MySQL dan PostgreSQL, adalah log transaksi (log transaksi). Tetapi karena log transaksi biasanya terpotong, mereka mungkin tidak mengandung seluruh sejarah perubahan. Oleh karena itu, untuk mendapatkan status sumber sepenuhnya, kita perlu dump. Kami memeriksa beberapa proyek CDC open source, sering menggunakan pustaka, API basis data, dan protokol yang sama, dan menemukan sejumlah batasan di dalamnya yang tidak memenuhi persyaratan kami. Misalnya, menghentikan pemrosesan peristiwa log hingga penyelesaian dump (snapshot data lengkap), ketidakmampuan untuk memulai dump dumping berdasarkan permintaan atau implementasi, memengaruhi lalu lintas tulis karena penggunaan kunci meja.

Ini mendorong kami untuk mengembangkan DBLog.dengan pendekatan terpadu untuk memproses log dan dump. Untuk mendukungnya, sejumlah fungsi harus diimplementasikan dalam DBMS, yang sudah ada di MySQL, PostgreSQL, MariaDB, dan sejumlah database lainnya.

Beberapa fitur DBLog:

  • Log peristiwa diproses dalam urutan terjadinya.
  • Dumps dapat dibuat kapan saja untuk semua tabel, untuk satu tabel atau untuk kunci primer spesifik suatu tabel.
  • Pemrosesan log berganti dengan pemrosesan sampah, membagi dump menjadi beberapa blok. Dengan demikian, pemrosesan log dapat terjadi secara paralel dengan pemrosesan dump. Jika proses berakhir maka dapat dilanjutkan setelah blok selesai terakhir tanpa harus memulai dari awal lagi. Ini juga memungkinkan Anda untuk menyesuaikan throughput saat membuat dump dan, jika perlu, menghentikan sementara pembuatannya.
  • , .
  • : , API.
  • . , .


Sebelumnya, kami membahas Delta ( terjemahan ), sebuah platform untuk pengayaan dan sinkronisasi data. Tujuan Delta adalah untuk menyinkronkan beberapa penyimpanan data, di mana salah satunya adalah primer (misalnya, MySQL), dan turunan lainnya (misalnya, ElasticSearch). Salah satu persyaratan pengembangan utama adalah keterlambatan rendah dalam menyebarkan perubahan dari sumber ke penerima, serta ketersediaan tinggi dari aliran acara. Ketentuan ini berlaku terlepas dari apakah semua gudang data digunakan oleh satu tim atau jika satu tim memiliki data dan yang lainnya mengkonsumsinya. Dalam sebuah artikel tentang Delta ( terjemahan ), kami juga menjelaskan kasus penggunaan yang melampaui sinkronisasi data, seperti pemrosesan acara.

Untuk menyinkronkan data dan memproses peristiwa, selain dapat melacak perubahan secara real time, kita harus memenuhi persyaratan berikut:

  • Mendapatkan status penuh . Toko yang diturunkan (seperti ElasticSearch) pada akhirnya harus menyimpan status sumber sepenuhnya. Kami menerapkan ini melalui kesedihan dari database asli.
  • Mulai pemulihan negara kapan saja. Daripada mempertimbangkan dump sebagai operasi satu kali hanya untuk inisialisasi primer, kita dapat melakukannya kapan saja: untuk semua tabel, untuk satu tabel atau untuk kunci primer spesifik. Ini sangat penting untuk pemulihan konsumen jika terjadi kehilangan atau kerusakan data.
  • - . . , (, ). - . , - .
  • . . API, , , . , , , .
  • . Netflix , Kafka, SQS, Kinesis, Netflix, Keystone. , (, ), (, ). . API.
  • . Netflix , (MySQL, PostgreSQL) AWS RDS. .



Kami memeriksa beberapa solusi open source yang ada, termasuk: Maxwell , SpinalTap , Yelp MySQL Streamer, dan Debezium . Dalam hal pengumpulan data, mereka semua bekerja dengan cara yang sama, menggunakan log transaksi. Misalnya, menggunakan protokol replikasi binlog di MySQL atau slot replikasi di PostgreSQL.

Tetapi ketika memproses dump, mereka memiliki setidaknya satu dari batasan berikut:

  • Hentikan pemrosesan log peristiwa saat membuat dump . Akibatnya, jika tempat pembuangan besar, maka pemrosesan peristiwa log berhenti untuk waktu yang lama. Ini akan menjadi masalah jika konsumen mengandalkan penundaan kecil dalam menyebarkan perubahan.
  • . . (, ElasticSearch) .
  • . . [5]. . , . . , PostgreSQL RDS .
  • Menggunakan fungsi basis data tertentu . Kami menemukan bahwa beberapa solusi menggunakan fitur basis data tambahan yang tidak ada pada semua sistem. Misalnya, menggunakan mesin lubang hitam di MySQL atau mendapatkan snapshot dump yang konsisten melalui slot replikasi di PostgreSQL. Kode batas ini digunakan kembali antara database yang berbeda.

Pada akhirnya, kami memutuskan untuk mengambil pendekatan berbeda untuk bekerja dengan dump:

  • log alternatif dan peristiwa dump sehingga mereka dapat dieksekusi bersama;
  • memulai dump kapan saja;
  • Jangan gunakan kunci meja
  • Jangan gunakan fitur basis data tertentu.

Kerangka DBLog


DBLog adalah kerangka kerja java untuk menerima dump dan perubahan secara real time. Dump dilakukan dalam beberapa bagian sehingga mereka bergantian dengan peristiwa waktu nyata dan tidak menunda pemrosesan mereka untuk waktu yang lama. Dumps dapat dilakukan kapan saja melalui API. Hal ini memungkinkan konsumen untuk mendapatkan keadaan penuh dari database pada tahap inisialisasi atau lambat untuk pemulihan bencana.

Saat merancang kerangka kerja, kami berpikir untuk meminimalkan dampak pada database. Dumps dapat dijeda dan dilanjutkan seperlunya. Ini berfungsi baik untuk pemulihan kerusakan dan untuk berhenti jika database telah menjadi hambatan. Kami juga tidak mengunci tabel agar tidak memengaruhi operasi penulisan.

DBLog memungkinkan Anda untuk merekam acara dalam berbagai bentuk, termasuk di basis data lain atau melalui API. Untuk menyimpan status yang terkait dengan pemrosesan log dan dump, serta untuk memilih node master, kami menggunakan Zookeeper. Saat membuat DBLog, kami menerapkan kemampuan untuk menghubungkan berbagai plugin, memungkinkan Anda untuk mengubah implementasi sesuai keinginan (misalnya, mengganti Zookeeper dengan yang lain).

Selanjutnya, kami mempertimbangkan secara lebih rinci pemrosesan log dan dump.

Log


Kerangka kerja ini membutuhkan database untuk merekam acara untuk setiap baris yang diubah secara real time, sambil mempertahankan urutan komitmen. Sumber peristiwa ini diasumsikan sebagai log transaksi. Basis data mengirimkannya melalui transportasi yang dapat digunakan DBLog. Untuk transportasi ini kami menggunakan istilah "log perubahan". Suatu acara dapat dari jenis berikut: buat, buat, perbarui, atau hapus. Untuk setiap peristiwa, informasi berikut harus diberikan: nomor urut dalam log (nomor urut log), keadaan kolom selama operasi, dan skema yang diterapkan pada saat operasi dilakukan.

Setiap perubahan serial ke dalam format acara DBLog dan dikirim ke penulis, untuk transfer lebih lanjut ke output. Mengirim acara ke penulis adalah operasi non-pemblokiran, karena penulis berjalan di utas terpisah dan mengumpulkan peristiwa dalam buffer internal. Acara yang disangga dikirim sesuai urutan penerimaannya. Kerangka kerja ini memungkinkan Anda untuk menghubungkan formatter khusus untuk membuat cerita bersambung peristiwa ke dalam format sewenang-wenang. Output adalah antarmuka sederhana yang memungkinkan Anda terhubung ke penerima apa pun, seperti aliran, gudang data, atau bahkan API.

Kesedihan


Pembuangan diperlukan karena log transaksi memiliki waktu penyimpanan terbatas, yang mencegahnya digunakan untuk memulihkan set data asli lengkap. Dump dibuat dalam blok (chunk) sehingga mereka dapat bergantian dengan peristiwa log, memungkinkan mereka untuk diproses secara bersamaan. Untuk setiap baris blok yang dipilih, suatu peristiwa dihasilkan dan diserialisasi dalam format yang sama dengan peristiwa log. Dengan demikian, konsumen tidak perlu khawatir suatu peristiwa datang dari log atau dump. Baik peristiwa log dan peristiwa dump dikirim ke output melalui penulis yang sama.

Dumps dapat dijadwalkan kapan saja melalui API untuk semua tabel, satu tabel, atau untuk kunci utama spesifik tabel. Tumpukan meja dilakukan oleh balok dengan ukuran tertentu. Anda juga dapat mengonfigurasi penundaan dalam memproses blok baru, yang hanya mengizinkan aktivitas log saat ini. Ukuran dan penundaan blok memungkinkan Anda menyeimbangkan pemrosesan peristiwa log dan dump. Kedua pengaturan dapat diubah saat runtime.

Blok (chunk) dipilih dengan mengurutkan tabel dalam urutan naik dari kunci primer dan memilih baris di mana kunci utama lebih besar dari kunci primer terakhir dari blok sebelumnya. Basis data diperlukan untuk menjalankan kueri ini secara efisien, yang biasanya berlaku untuk sistem yang menerapkan pemindaian rentang pada rentang kunci utama.


Gambar 1. Tabel breakdown dengan 4 kolom c1-c4 dan c1 sebagai kunci utama (pk). Kunci utama dari tipe integer, ukuran blok 3. Blok 2 dipilih dengan kondisi c1> 4.

Blok harus diambil sedemikian rupa agar tidak menunda pemrosesan peristiwa log untuk periode yang lama dan menyimpan riwayat perubahan sehingga baris yang dipilih dengan nilai lama tidak dapat menimpa yang lebih baru peristiwa.

Agar dapat memilih blok secara berurutan, dalam log perubahan kita membuat "tanda air" yang dapat dikenali. Tanda air diimplementasikan melalui tabel di basis data sumber. Tabel ini disimpan dalam namespace khusus sehingga tidak ada konflik dengan tabel aplikasi. Hanya satu baris dengan nilai UUID yang disimpan di dalamnya. Tanda air dibuat ketika nilai ini berubah menjadi UUID tertentu. Memperbarui baris mengarah ke acara perubahan, yang akhirnya kami terima melalui log perubahan.

Tempat pembuangan air ditandai sebagai berikut:

  1. Kami sementara menangguhkan pemrosesan acara log.
  2. Hasilkan tanda air "rendah" dengan memperbarui tabel tanda air.
  3. Kami mulai SELECT untuk blok berikutnya dan menyimpan dalam memori hasil diindeks oleh kunci utama.
  4. β€œβ€ (high) , .
  5. . .
  6. , .
  7. , , .
  8. , 1.

SELECT seharusnya mengembalikan negara yang mewakili perubahan berkomitmen hingga titik dalam sejarah. Atau, setara dengan yang berikut: SELECT dijalankan di posisi tertentu dari log perubahan, dengan mempertimbangkan perubahan akun hingga saat ini. Database biasanya tidak memberikan informasi tentang waktu eksekusi SELECT (dengan pengecualian MariaDB ).

Gagasan utama dari pendekatan kami adalah bahwa dalam log perubahan kami mendefinisikan jendela yang menjamin pelestarian posisi SELECT di blok. Jendela dibuka dengan menuliskan tanda air lebih rendah, setelah itu SELECT dijalankan, dan jendela ditutup dengan menuliskan tanda air atas. Karena posisi pasti SELECT tidak diketahui, semua baris yang dipilih yang bertentangan dengan peristiwa log di jendela ini dihapus. Ini memastikan bahwa tidak ada penulisan ulang sejarah di log perubahan.

Agar ini berfungsi, SELECT harus membaca status tabel sejak saat tanda air lebih rendah atau lebih baru (diizinkan untuk menyertakan perubahan yang dibuat setelah tanda air lebih rendah dan sebelum membaca). Secara umum, SELECT diperlukan untuk melihat perubahan yang dibuat sebelum dieksekusi.. Kami menyebutnya "bacaan non-basi". Selain itu, karena tanda air atas ditulis setelahnya, dijamin bahwa SELECT akan dieksekusi sebelum itu.

Gambar 2a dan 2b menggambarkan algoritma pemilihan blok. Sebagai contoh, kami memberikan tabel dengan kunci utama dari k1 ke k6. Setiap entri dalam log perubahan mewakili acara buat, perbarui, atau hapus untuk kunci utama. Gambar 2a menunjukkan pembuatan tanda air dan pemilihan blok (langkah 1 hingga 4). Memperbarui tabel tanda air pada langkah 2 dan 4 menciptakan dua peristiwa perubahan (magenta), yang akhirnya diterima melalui log. Pada Gambar 2b, kami fokus pada garis blok saat ini yang dihapus dari hasil yang ditetapkan dengan kunci utama yang muncul di antara tanda air (langkah 5 hingga 7).


Gambar 2a - Algoritma Watermark untuk pemilihan blok (langkah 1-4).


Gambar 2b - Algoritma Watermark untuk memilih blok (langkah 5-7).

Harap dicatat bahwa antara tanda air yang lebih rendah dan yang lebih tinggi, sejumlah besar peristiwa dapat muncul di log jika satu atau lebih transaksi membuat banyak perubahan baris. Untuk alasan ini, kami melakukan penangguhan jangka pendek dari pemrosesan log pada tahap 2-4 agar tidak ketinggalan tanda air. Dengan demikian, pemrosesan peristiwa log dapat dilanjutkan peristiwa demi peristiwa, yang pada akhirnya memungkinkan Anda untuk mendeteksi tanda air tanpa perlu melakukan cache peristiwa log. Pemrosesan log ditangguhkan hanya untuk waktu yang singkat, karena langkah 2–4 diharapkan lebih cepat: memperbarui tanda air adalah operasi penulisan tunggal, dan SELECT dilakukan dengan pembatasan.

Segera setelah tanda air atas diterima pada langkah 7, garis-garis blok yang tidak bertentangan ditransmisikan ke output sesuai urutan mereka diterima. Ini adalah operasi non-pemblokiran, karena penulis berjalan di utas terpisah, yang memungkinkan Anda untuk dengan cepat melanjutkan memproses log setelah langkah 7. Setelah itu, pemrosesan log berlanjut untuk peristiwa yang terjadi setelah tanda air atas.

Gambar 2c menunjukkan urutan rekaman untuk seluruh blok menggunakan contoh yang sama seperti pada gambar 2a dan 2b. Peristiwa dalam log yang muncul sebelum tanda air atas dicatat terlebih dahulu. Kemudian garis yang tersisa dari hasil blok (magenta). Dan akhirnya, peristiwa yang terjadi setelah tanda air teratas dicatat.


Gambar 2c - Urutan perekaman output. Strip pergantian dengan dump.

Database yang didukung


Untuk menggunakan DBLog, database harus menyediakan log perubahan sebagai riwayat linear dari perubahan yang dilakukan dengan pembacaan non-basi. Kondisi ini dipenuhi oleh sistem seperti MySQL, PostgreSQL, MariaDB, dll., Sehingga kerangka kerja dapat digunakan dengan cara yang sama dengan database ini.
Sejauh ini, kami telah menambahkan dukungan untuk MySQL dan PostgreSQL. Setiap basis data menggunakan perpustakaannya sendiri untuk menerima peristiwa log, karena masing-masing menggunakan protokol berpemilik. Untuk MySQL, kami menggunakan shyiko / mysql-binlog-connector , yang mengimplementasikan protokol replikasi binlog. Untuk PostgreSQL, ada slot replikasi dengan plugin wal2json . Perubahan diterima melalui protokol replikasi streaming, yang diimplementasikan oleh driver jdbcPostgreSQL Definisi skema untuk setiap perubahan yang diambil berbeda di MySQL dan PostgreSQL. Di PostgreSQL, wal2json berisi nama, tipe kolom, dan nilai-nilai. Untuk MySQL, perubahan skema harus dilacak sebagai peristiwa binlog.

Pemrosesan Dump dilakukan menggunakan SQL dan JDBC, hanya membutuhkan implementasi pemilihan blok dan memperbarui tanda air. Untuk MySQL dan PostgreSQL, kode yang sama digunakan, yang dapat digunakan untuk database serupa lainnya. Pemrosesan dump itu sendiri tidak tergantung pada SQL atau JDBC dan memungkinkan Anda untuk menggunakan database yang memenuhi persyaratan DBLog, bahkan jika mereka menggunakan standar yang berbeda.


Gambar 3 - Arsitektur DBLog tingkat tinggi.

Ketersediaan tinggi


DBLog menggunakan arsitektur pasif aktif node tunggal. Satu instance aktif (terkemuka), sementara yang lain pasif (siaga). Untuk memilih host, kami menggunakan Zookeeper. Sewa digunakan untuk node master, yang harus diperbarui secara berkala untuk terus menjadi master. Dalam hal penghentian pembaruan sewa, fungsi pemimpin ditransfer ke simpul lain. Saat ini, kami menyebarkan satu salinan untuk setiap AZ (zona ketersediaan, biasanya kami memiliki 3 AZ), jadi jika satu AZ jatuh, maka salinan di AZ lain dapat melanjutkan pemrosesan dengan total waktu henti minimum. Mesin virtual cadangan dapat ditemukan di wilayah yang berbeda, meskipun Anda disarankan untuk bekerja di wilayah yang sama dengan host database untuk memberikan latensi rendah untuk menangkap perubahan.

Gunakan pada Produksi


DBLog adalah dasar untuk konektor MySQL dan PostgreSQL yang digunakan di Delta . Sejak 2018, Delta telah digunakan dalam produksi untuk menyinkronkan gudang data dan pemrosesan acara di aplikasi studio Netflix. Konektor Delta menggunakan serializer acara mereka. Aliran spesifik Netflix seperti Keystone digunakan sebagai output .


Gambar 4 - Konektor Delta.

Selain Delta, DBLog juga digunakan di Netflix untuk membuat konektor untuk platform pergerakan data lainnya yang memiliki format data sendiri.

tetaplah bersama kami


DBLog memiliki fitur tambahan yang tidak tercakup dalam artikel ini, seperti:

  • Kemampuan untuk mendapatkan skema tabel tanpa menggunakan kunci.
  • Integrasi dengan penyimpanan skema. Untuk setiap peristiwa, skema disimpan dalam penyimpanan, tautan yang ditunjukkan dalam muatan acara.
  • Mode tulis monoton. Memastikan bahwa setelah menyimpan status baris tertentu, status lampau tidak dapat ditimpa. Dengan demikian, konsumen menerima perubahan negara hanya dalam arah maju, tanpa bergerak maju mundur dalam waktu.

Kami berencana untuk membuka kode sumber DBLog pada tahun 2020 dan menyertakan dokumentasi tambahan di dalamnya.

Ucapan Terima Kasih


Kami ingin mengucapkan terima kasih kepada orang-orang berikut untuk berkontribusi pada pengembangan DBLog: Josh Snyder , Raghuram Onti Srinivasan , Tharanga Gamaethige dan Yun Wang .

Referensi


[1] Das, Shirshanka, dkk. "Semua di atas Databus!: Platform pengambilan data perubahan yang konsisten yang dapat diukur dari Linkedin." Simposium ACM Ketiga tentang Cloud Computing. ACM, 2012
[2] "Tentang Ubah Pengambilan Data (SQL Server)" , Microsoft SQL docs, 2019
[3] Kleppmann, Martin, "Menggunakan log untuk membangun infrastruktur data yang solid (atau: mengapa menulis ganda adalah ide yang buruk)" , Confluent, 2015
[4] Kleppmann, Martin, Alastair R. Beresford, Boerge Svingen. "Pemrosesan acara online." Komunikasi ACM 62.5 (2019): 43–49
[5] https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots


Pelajari lebih lanjut tentang kursus.

All Articles