कैसे अपने स्वयं के प्रोसेसर लिखने या NiFi में कार्यक्षमता का विस्तार करने के लिए

NiFi अधिक से अधिक लोकप्रियता प्राप्त कर रहा है और हर नई रिलीज़ के साथ इसे डेटा के साथ काम करने के लिए अधिक से अधिक उपकरण मिलते हैं। फिर भी, किसी विशेष समस्या को हल करने के लिए आपका अपना उपकरण होना आवश्यक हो सकता है। Apache Nifi के बेस पैकेज में 300 से अधिक प्रोसेसर हैं। Nifi प्रोसेसर





यह NiFi इकोसिस्टम में डेटाफ्लो बनाने के लिए मुख्य बिल्डिंग ब्लॉक है। प्रोसेसर एक इंटरफ़ेस प्रदान करते हैं, जिसके माध्यम से NiFi फ्लोफाइल, इसकी विशेषताओं और सामग्री तक पहुंच प्रदान करता है। स्वयं के कस्टम प्रोसेसर बिजली, समय और उपयोगकर्ता का ध्यान बचाएंगे, क्योंकि कई सरल प्रोसेसर तत्वों के बजाय, इंटरफ़ेस में केवल एक प्रदर्शित किया जाएगा और केवल एक को निष्पादित किया जाएगा (अच्छी तरह से, या आप कितना लिखते हैं)। मानक प्रोसेसर की तरह, एक कस्टम प्रोसेसर आपको विभिन्न संचालन करने और एक प्रवाह की सामग्री को संसाधित करने की अनुमति देता है। आज हम कार्यक्षमता के विस्तार के लिए मानक उपकरणों के बारे में बात करेंगे।

ExecuteScript


ExecuteScript एक सार्वभौमिक प्रोसेसर है जिसे एक प्रोग्रामिंग भाषा (Groovy, Jython, Javascript, JRuby) में व्यावसायिक तर्क को लागू करने के लिए डिज़ाइन किया गया है। यह दृष्टिकोण आपको वांछित कार्यक्षमता प्राप्त करने की अनुमति देता है। एक स्क्रिप्ट में NiFi घटकों तक पहुंच प्रदान करने के लिए, निम्न चर का उपयोग करना संभव है:

सत्र : प्रकार का एक प्रकार। org.apache.nifi.processor.ProcessSession। चर आपको फ्लोफाइल के साथ संचालन करने की अनुमति देता है, जैसे कि क्रिएट (), putAttribute () और ट्रांसफर (), साथ ही पढ़ा () और लिखना ()।

प्रसंग : org.apache.nifi.processor.ProcessContext इसका उपयोग प्रोसेसर के गुण, संबंध, नियंत्रक सेवाओं और StateManager को प्राप्त करने के लिए किया जा सकता है।

REL_SUCCESS : संबंध "सफलता"।

REL_FAILURE : विफलता संबंध

डायनेमिक गुण : ExecuteScript में परिभाषित डायनेमिक गुण स्क्रिप्ट इंजन के लिए वैरिएबल के रूप में दिए जाते हैं, एक PropertyValue के रूप में सेट किए जाते हैं। यह आपको संपत्ति का मूल्य प्राप्त करने की अनुमति देता है, मूल्य को उपयुक्त डेटा प्रकार में परिवर्तित करता है, उदाहरण के लिए, तार्किक, आदि

। उपयोग के लिए, बस हमारी स्क्रिप्ट या स्क्रिप्ट के साथ Script Engineफ़ाइल का स्थान चुनें और निर्दिष्ट करें आइए कुछ उदाहरणों को देखें: कतार से एक स्ट्रीम फ़ाइल प्राप्त करेंScript File Script Body







flowFile = session.get() 
if(!flowFile) return

एक नया FlowFile जनरेट करें

flowFile = session.create()
// Additional processing here

फ़्लोफ़ाइल में विशेषता जोड़ें

flowFile = session.get() 
if(!flowFile) return
flowFile = session.putAttribute(flowFile, 'myAttr', 'myValue')

सभी विशेषताओं को निकालें और संसाधित करें।

flowFile = session.get() if(!flowFile) return
flowFile.getAttributes().each { key,value ->
// Do something with the key/value pair
}

लकड़हारा

log.info('Found these things: {} {} {}', ['Hello',1,true] as Object[])

आप ExecuteScript में उन्नत सुविधाओं का उपयोग कर सकते हैं, इसके बारे में अधिक लेख ExecuteScript Cookbook में पाया जा सकता है।

ExecuteGroovyScript


ExecuteGroovyScript में ExecuteScript के समान कार्यक्षमता है, लेकिन मान्य भाषाओं के चिड़ियाघर के बजाय, आप केवल एक का उपयोग कर सकते हैं - groovy। इस प्रोसेसर का मुख्य लाभ सेवा सेवाओं का अधिक सुविधाजनक उपयोग है। चर सत्र, संदर्भ, आदि के मानक सेट के अलावा। आप CTL और SQL उपसर्ग के साथ गतिशील गुणों को परिभाषित कर सकते हैं। संस्करण 1.11 से शुरू होकर, RecordReader और Record Writer का समर्थन दिखाई दिया। सभी संपत्तियाँ हैशपॉप हैं, जो कुंजी के रूप में "सेवा नाम" का उपयोग करती हैं, और संपत्ति के आधार पर मूल्य एक विशिष्ट वस्तु है:

RecordWriter          HashMap<String,RecordSetWriterFactory>
RecordReader         HashMap<String,RecordReaderFactory>
SQL                       HashMap<String,groovy.sql.Sql> 
CTL                       HashMap<String,ControllerService>

यह जानकारी पहले से ही जीवन को आसान बनाती है। हम स्रोतों में देख सकते हैं या किसी विशेष वर्ग के लिए प्रलेखन पा सकते हैं।

डेटाबेस के साथ कार्य करना

यदि हम SQL.DB संपत्ति को परिभाषित करते हैं और DBCPService को बांधते हैं, तो हम कोड से संपत्ति तक पहुंचेंगे। SQL.DB.rows('select * from table')



प्रोसेसर स्वचालित रूप से निष्पादन से पहले dbcp सेवा से एक कनेक्शन स्वीकार करता है और लेनदेन की प्रक्रिया करता है। डेटाबेस ट्रांजेक्शंस स्वचालित रूप से वापस आ जाते हैं जब कोई त्रुटि होती है और सफल होने पर प्रतिबद्ध होती है। ExecuteGroovyScript में, आप उचित स्थिर विधियों को लागू करके घटनाओं को शुरू और रोक सकते हैं।

import org.apache.nifi.processor.ProcessContext
...
static onStart(ProcessContext context){
// your code
}
static onStop(ProcessContext context){
// your code
}
REL_SUCCESS << flowFile

InvokeScriptedProcessor


एक और दिलचस्प प्रोसेसर। इसका उपयोग करने के लिए, आपको एक वर्ग की घोषणा करने की आवश्यकता है जो लागू करता है इंटरफ़ेस लागू करता है और एक प्रोसेसर चर को परिभाषित करता है। आप किसी भी PropertyDescriptor या रिलेशनशिप को परिभाषित कर सकते हैं, पैरेंट कंपोनेंटलॉग को भी एक्सेस कर सकते हैं और void onScheduled (ProcessContext संदर्भ) और शून्य ऑनस्टॉप (ProcessContext संदर्भ) विधियों को परिभाषित कर सकते हैं। इन तरीकों को तब बुलाया जाएगा जब NiFi (onScheduled) में कोई शेड्यूल किया गया इवेंट शुरू होता है और जब यह बंद हो जाता है (onScheduled)।

class GroovyProcessor implements Processor {
          @Override
          void initialize(ProcessorInitializationContext context) { log = context.getLogger()
          }

          @Override
          void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) thr

          @Override
          Collection<ValidationResult> validate(ValidationContext context) { return null 

          @Override
          PropertyDescriptor getPropertyDescriptor(String name) { return null	}

          @Override
          void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String n

          @Override
          List<PropertyDescriptor> getPropertyDescriptors() { return null }

          @Override
          String getIdentifier() { return null }
}

तर्क को विधि में लागू किया जाना चाहिए void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory)

 @Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throw s ProcessException {
            final ProcessSession session = sessionFactory.createSession(); def 
            flowFile = session.create()
           if (!flowFile) return
                 // your code
           try 
                 { session.commit();
           } catch (final Throwable t) { 
                 session.rollback(true); 
                 throw t;
           }
}

इंटरफ़ेस में घोषित सभी तरीकों का वर्णन करना अनावश्यक है, तो चलो एक सार वर्ग मिलता है जिसमें हम निम्नलिखित विधि की घोषणा करते हैं:

abstract void executeScript(ProcessContext context, ProcessSession session)

जिस विधि में हम बुलाएंगे

void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory)

import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.components.ValidationContext import org.apache.nifi.components.ValidationResult import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.ProcessContext import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.ProcessSessionFactory import org.apache.nifi.processor.Processor
import org.apache.nifi.processor.ProcessorInitializationContext import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException 

abstract class BaseGroovyProcessor implements Processor {

	public ComponentLog log

	public Set<Relationship> relationships; 

	@Override
	void initialize(ProcessorInitializationContext context) { log = context.getLogger()
	}

	@Override
	void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) thr final ProcessSession session = sessionFactory.createSession();
	try {
		executeScript(context, session); 
		session.commit();
	} catch (final Throwable t) { 
		session.rollback(true); 
		throw t;
	}
}

	abstract void executeScript(ProcessContext context, ProcessSession session) thro

	@Override
	Collection<ValidationResult> validate(ValidationContext context) { return null }

	@Override
	PropertyDescriptor getPropertyDescriptor(String name) { return null	} 

	@Override
	void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String n

	@Override
	List<PropertyDescriptor> getPropertyDescriptors() { return null }
	
	@Override
	String getIdentifier() { return null }

}

अब हम एक उत्तराधिकारी वर्ग की घोषणा करते हैं BaseGroovyProcessorऔर हमारे निष्पादनों का वर्णन करते हैं, हम रिलेशनशिप रैलसक्युस और रिलेटिव को भी जोड़ते हैं।

import org.apache.commons.lang3.tuple.Pair
import org.apache.nifi.expression.ExpressionLanguageScope import org.apache.nifi.processor.util.StandardValidators import ru.rt.nifi.common.BaseGroovyProcessor

import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.ProcessContext import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.exception.ProcessException import org.quartz.CronExpression

import java.sql.Connection
import java.sql.PreparedStatement import java.sql.ResultSet
import java.sql.SQLException import java.sql.Statement

class InvokeScripted extends BaseGroovyProcessor {
	public static final REL_SUCCESS = new Relationship.Builder()
		.name("success")
		.description("If the cache was successfully communicated with it will be rou
		.build()

	public static final REL_FAILURE = new Relationship.Builder()
		.name("failure")
		.description("If unable to communicate with the cache or if the cache entry
		.build()

	@Override
	void executeScript(ProcessContext context, ProcessSession session) throws Proces 	def flowFile = session.create()
		if (!flowFile) return

		try {
				// your code

			session.transfer(flowFile, REL_SUCCESS)
		} catch(ProcessException | SQLException e) { 
			session.transfer(flowFile, REL_FAILURE)
			log.error("Unable to execute SQL select query {} due to {}. No FlowFile
		}
	}
}



कोड के अंत में जोड़ें। processor = new InvokeScripted()

यह दृष्टिकोण एक कस्टम प्रोसेसर बनाने के समान है।

निष्कर्ष


एक कस्टम प्रोसेसर बनाना सबसे आसान काम नहीं है - पहली बार आपको यह पता लगाने के लिए कड़ी मेहनत करनी होगी, लेकिन इस कार्रवाई के लाभ निर्विवाद हैं।

रोस्टेलकॉम डेटा मैनेजमेंट टीम द्वारा तैयार पोस्ट

All Articles