Cara menulis prosesor Anda sendiri atau memperluas fungsionalitas di NiFi

NiFi semakin populer dan dengan setiap rilis baru ini semakin banyak alat untuk bekerja dengan data. Namun demikian, mungkin Anda perlu memiliki alat sendiri untuk menyelesaikan masalah tertentu. Apache Nifi memiliki lebih dari 300 prosesor dalam paket dasarnya. Prosesor Nifi





Ini adalah blok bangunan utama untuk membuat aliran data di ekosistem NiFi. Prosesor menyediakan antarmuka di mana NiFi menyediakan akses ke flowfile, atributnya, dan konten. Prosesor kustom sendiri akan menghemat energi, waktu, dan perhatian pengguna, karena alih-alih banyak elemen prosesor sederhana, hanya satu yang akan ditampilkan di antarmuka dan hanya satu yang akan dieksekusi (baik, atau berapa banyak yang Anda tulis). Seperti prosesor standar, prosesor kustom memungkinkan Anda untuk melakukan berbagai operasi dan memproses isi dari suatu flowfile. Hari ini kita akan berbicara tentang alat standar untuk memperluas fungsionalitas.

ExecuteScript


ExecuteScript adalah prosesor universal yang dirancang untuk mengimplementasikan logika bisnis dalam bahasa pemrograman (Groovy, Jython, Javascript, JRuby). Pendekatan ini memungkinkan Anda untuk dengan cepat mendapatkan fungsionalitas yang diinginkan. Untuk memberikan akses ke komponen NiFi dalam skrip, dimungkinkan untuk menggunakan variabel berikut:

Sesi : variabel tipe org.apache.nifi.processor.ProcessSession. Variabel memungkinkan Anda untuk melakukan operasi dengan flowfile, seperti create (), putAttribute () dan Transfer (), serta read () dan write ().

Konteks : org.apache.nifi.processor.ProcessContext. Ini dapat digunakan untuk mendapatkan properti prosesor, hubungan, layanan pengontrol, dan StateManager.

REL_SUCCESS : Hubungan "sukses".

REL_FAILURE : Hubungan Kegagalan

Properti Dinamis : Properti dinamis yang ditentukan dalam ExecuteScript diteruskan ke mesin skrip sebagai variabel, ditetapkan sebagai PropertyValue. Ini memungkinkan Anda untuk mendapatkan nilai properti, mengonversi nilai ke tipe data yang sesuai, misalnya, logis, dll.

Untuk digunakan, cukup pilih Script Enginedan tentukan lokasi file Script Filedengan skrip kami atau skrip itu sendiri Script Body.



Mari kita lihat beberapa contoh:

dapatkan satu aliran file dari antrian

flowFile = session.get() 
if(!flowFile) return

Hasilkan FlowFile baru

flowFile = session.create()
// Additional processing here

Tambahkan Atribut ke FlowFile

flowFile = session.get() 
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

Ekstrak dan proses semua atribut.

flowFile = session.get() if(!flowFile) return
flowFile.getAttributes().each { key,value ->
// Do something with the key/value pair
}

Logger

log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

Anda dapat menggunakan fitur-fitur canggih dalam ExecuteScript, lebih lanjut tentang ini dapat ditemukan di artikel ExecuteScript Cookbook.

ExecuteGroovyScript


ExecuteGroovyScript memiliki fungsi yang sama dengan ExecuteScript, tetapi alih-alih kebun binatang bahasa yang valid, Anda hanya dapat menggunakan satu-groovy. Keuntungan utama dari prosesor ini adalah penggunaan layanan layanan yang lebih nyaman. Selain set variabel standar Sesi, Konteks, dll. Anda bisa mendefinisikan properti dinamis dengan awalan CTL dan SQL. Dimulai dengan versi 1.11, dukungan untuk RecordReader dan Record Writer muncul. Semua properti adalah HashMap, yang menggunakan "Nama Layanan" sebagai kunci, dan nilainya adalah objek tertentu tergantung pada properti:

RecordWriter          HashMap<String,RecordSetWriterFactory>
RecordReader         HashMap<String,RecordReaderFactory>
SQL                       HashMap<String,groovy.sql.Sql> 
CTL                       HashMap<String,ControllerService>

Informasi ini sudah membuat hidup lebih mudah. kita dapat melihat ke sumber atau menemukan dokumentasi untuk kelas tertentu.

Bekerja dengan database

Jika kita mendefinisikan properti SQL.DB dan mengikat DBCPService, maka kita akan mengakses properti dari kode tersebut. SQL.DB.rows('select * from table')



Prosesor secara otomatis menerima koneksi dari layanan dbcp sebelum eksekusi dan memproses transaksi. Transaksi basis data secara otomatis dibatalkan ketika kesalahan terjadi dan dilakukan jika berhasil. Di ExecuteGroovyScript, Anda dapat mencegat acara mulai dan berhenti dengan menerapkan metode statis yang sesuai.

import org.apache.nifi.processor.ProcessContext
...
static onStart(ProcessContext context){
// your code
}
static onStop(ProcessContext context){
// your code
}
REL_SUCCESS << flowFile

InvokeScriptedProcessor


Prosesor lain yang menarik. Untuk menggunakannya, Anda perlu mendeklarasikan kelas yang mengimplementasikan antarmuka implement dan mendefinisikan variabel prosesor. Anda bisa mendefinisikan PropertyDescriptor atau Hubungan apa pun, juga mengakses ComponentLog induk dan menentukan metode batal diJadwal (konteks ProcessContext) dan batal onStopped (konteks ProcessContext). Metode-metode ini akan dipanggil ketika acara mulai terjadwal terjadi di NiFi (onScheduled) dan ketika berhenti (onScheduled).

class GroovyProcessor implements Processor {
          @Override
          void initialize(ProcessorInitializationContext context) { log = context.getLogger()
          }

          @Override
          void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) thr

          @Override
          Collection<ValidationResult> validate(ValidationContext context) { return null 

          @Override
          PropertyDescriptor getPropertyDescriptor(String name) { return null	}

          @Override
          void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String n

          @Override
          List<PropertyDescriptor> getPropertyDescriptors() { return null }

          @Override
          String getIdentifier() { return null }
}

Logika harus diimplementasikan dalam metode ini void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory)

 @Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throw s ProcessException {
            final ProcessSession session = sessionFactory.createSession(); def 
            flowFile = session.create()
           if (!flowFile) return
                 // your code
           try 
                 { session.commit();
           } catch (final Throwable t) { 
                 session.rollback(true); 
                 throw t;
           }
}

Tidak perlu untuk menggambarkan semua metode yang dideklarasikan di antarmuka, jadi mari kita berkeliling satu kelas abstrak di mana kita mendeklarasikan metode berikut:

abstract void executeScript(ProcessContext context, ProcessSession session)

Metode yang akan kita panggil

void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory)

import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.components.ValidationContext import org.apache.nifi.components.ValidationResult import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.ProcessContext import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.ProcessSessionFactory import org.apache.nifi.processor.Processor
import org.apache.nifi.processor.ProcessorInitializationContext import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException 

abstract class BaseGroovyProcessor implements Processor {

	public ComponentLog log

	public Set<Relationship> relationships; 

	@Override
	void initialize(ProcessorInitializationContext context) { log = context.getLogger()
	}

	@Override
	void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) thr final ProcessSession session = sessionFactory.createSession();
	try {
		executeScript(context, session); 
		session.commit();
	} catch (final Throwable t) { 
		session.rollback(true); 
		throw t;
	}
}

	abstract void executeScript(ProcessContext context, ProcessSession session) thro

	@Override
	Collection<ValidationResult> validate(ValidationContext context) { return null }

	@Override
	PropertyDescriptor getPropertyDescriptor(String name) { return null	} 

	@Override
	void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String n

	@Override
	List<PropertyDescriptor> getPropertyDescriptors() { return null }
	
	@Override
	String getIdentifier() { return null }

}

Sekarang kita mendeklarasikan kelas penerus BaseGroovyProcessordan mendeskripsikan executScript kita, kita juga menambahkan Relationship RELSUCCESS dan RELFAILURE.

import org.apache.commons.lang3.tuple.Pair
import org.apache.nifi.expression.ExpressionLanguageScope import org.apache.nifi.processor.util.StandardValidators import ru.rt.nifi.common.BaseGroovyProcessor

import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.ProcessContext import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.exception.ProcessException import org.quartz.CronExpression

import java.sql.Connection
import java.sql.PreparedStatement import java.sql.ResultSet
import java.sql.SQLException import java.sql.Statement

class InvokeScripted extends BaseGroovyProcessor {
	public static final REL_SUCCESS = new Relationship.Builder()
		.name("success")
		.description("If the cache was successfully communicated with it will be rou
		.build()

	public static final REL_FAILURE = new Relationship.Builder()
		.name("failure")
		.description("If unable to communicate with the cache or if the cache entry
		.build()

	@Override
	void executeScript(ProcessContext context, ProcessSession session) throws Proces 	def flowFile = session.create()
		if (!flowFile) return

		try {
				// your code

			session.transfer(flowFile, REL_SUCCESS)
		} catch(ProcessException | SQLException e) { 
			session.transfer(flowFile, REL_FAILURE)
			log.error("Unable to execute SQL select query {} due to {}. No FlowFile
		}
	}
}



Tambahkan ke akhir kode. processor = new InvokeScripted()

Pendekatan ini mirip dengan membuat prosesor kustom.

Kesimpulan


Membuat prosesor khusus bukanlah hal yang mudah - untuk pertama kali Anda harus bekerja keras untuk mengetahuinya, tetapi manfaat dari tindakan ini tidak dapat dipungkiri.

Pos disiapkan oleh Tim Manajemen Data Rostelecom

All Articles