|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
对于一个大型项目,如果用java来作,可能需要9个月,并且可能需要翻阅10本以上的书,但如果用ruby来作,3个月,3本书就足够了,而.net网页编程也不过3,4本书足以,这就是区别。在前一篇文章《Oozie简介》中,我们已形貌了Oozie事情流服务器,而且展现了一个十分复杂的事情流示例。我们还形貌了针对Oozie的事情流的部署和设置,和用来启动、中断和监控Oozie事情流的工具。
在本文中,我们会形貌一个加倍庞大的例子,经由过程它我们能够会商更多Oozie特征,并演示怎样来利用它们。
界说历程
我们在此形貌的事情流会完成汽车GPS探测数据的猎取历程。我们每一个小时城市以文件的情势把探测数据传送到指定的HDFS目次中[1],个中包括有这个小时以内的一切探测数据。探测数据的猎取是天天针对一天内一切的24个文件完成的。假如文件的数目是24,那末猎取历程就会启动。不然:
- 当天甚么都不做
- 对前一天——最多到7天,发送剩下的内容到探测数据供应程序
- 假如目次的存在工夫已到达7天,那末就猎取一切可用的探测数据文件。
历程的整体完成请见
(点击能够检察年夜图)。
<br>
:历程图
在此,支流程(数据猎取流程)起首会为明天和之前的六天盘算出目次的称号,然后启动(fork)七个目次的子历程(子流程)。待一切子历程的形态都酿成停止以后,join步骤就会把把持权交给end形态。
子历程启动时,起首会取得关于目次的信息——它的日期和文件数目。基于这条信息,它会决意是猎取数据仍是把数据回档,大概发送剩下的邮件,大概不做任何事情。
Directory子历程完成
以下代码卖力完成的是directory子历程(代码1)。- <workflow-appxmlns=uri:oozie:workflow:0.1name=processDir><startto=getDirInfo/><!--STEPONE--><actionname=getDirInfo><!--writes2properties:dir.num-files:returns-1ifdirdoesntexist,otherwisereturns#offilesindirdir.age:returns-1ifdirdoesntexist,otherwisereturnsageofdirindays--><java><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><main-class>com.navteq.oozie.GetDirInfo</main-class><arg>${inputDir}</arg><capture-output/></java><okto="makeIngestDecision"/><errorto="fail"/></action><!--STEPTWO--><decisionname="makeIngestDecision"><switch><!--emptyordoesntexist--><caseto="end">${wf:actionData(getDirInfo)[dir.num-files]lt0||(wf:actionData(getDirInfo)[dir.age]lt1andwf:actionData(getDirInfo)[dir.num-files]lt24)}</case><!--#offiles>=24--><caseto="ingest">${wf:actionData(getDirInfo)[dir.num-files]gt23||wf:actionData(getDirInfo)[dir.age]gt6}</case><defaultto="sendEmail"/></switch></decision><!--EMAIL--><actionname="sendEmail"><java><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><main-class>com.navteq.oozie.StandaloneMailer</main-class><arg>probedata2@navteq.com</arg><arg>gregory.titievsky@navteq.com</arg><arg>${inputDir}</arg><arg>${wf:actionData(getDirInfo)[dir.num-files]}</arg><arg>${wf:actionData(getDirInfo)[dir.age]}</arg></java><okto="end"/><errorto="fail"/></action><!--INGESTION--><actionname="ingest"><java><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><prepare><deletepath="${outputDir}"/></prepare><configuration><property><name>mapred.reduce.tasks</name><value>300</value></property></configuration><main-class>com.navteq.probedata.drivers.ProbeIngest</main-class><arg>-conf</arg><arg>action.xml</arg><arg>${inputDir}</arg><arg>${outputDir}</arg></java><okto="archive-data"/><errorto="ingest-fail"/></action><!—ArchiveData--><actionname="archive-data"><fs><movesource=${inputDir}target=/probe/backup/${dirName}/><deletepath=${inputDir}/></fs><okto="end"/><errorto="ingest-fail"/></action><killname="ingest-fail"><message>Ingestionfailed,errormessage[${wf:errorMessage(wf:lastErrorNode())}]</message></kill><killname="fail"><message>Javafailed,errormessage[${wf:errorMessage(wf:lastErrorNode())}]</message></kill><endname=end/></workflow-app>
复制代码 代码1:Directory子历程
这个子历程的start节点会触发自界说的java节点,这个节点会取得目次信息(代码2)。- packagecom.navteq.oozie;importjava.io.File;importjava.io.FileOutputStream;importjava.io.OutputStream;importjava.util.GregorianCalendar;importjava.util.Properties;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;publicclassGetDirInfo{privatestaticfinalStringOOZIE_ACTION_OUTPUT_PROPERTIES="oozie.action.output.properties";publicstaticvoidmain(String[]args)throwsException{StringdirPath=args[0];StringpropKey0="dir.num-files";StringpropVal0="-1";StringpropKey1="dir.age";StringpropVal1="-1";System.out.println("Directorypath:"+dirPath+"");Configurationconf=newConfiguration();FileSystemfs=FileSystem.get(conf);PathhadoopDir=newPath(dirPath);if(fs.exists(hadoopDir)){FileStatus[]files=FileSystem.get(conf).listStatus(hadoopDir);intnumFilesInDir=files.length;propVal0=Integer.toString(numFilesInDir);longtimePassed,daysPassedLong;intdaysPassed;StringdirName=hadoopDir.getName();String[]dirNameArray=dirName.split("-");if(dirNameArray.length==3){intyear=Integer.valueOf(dirNameArray[0]);intmonth=Integer.valueOf(dirNameArray[1])-1;//monthsare0basedintdate=Integer.valueOf(dirNameArray[2]);GregorianCalendardirCreationDate=newGregorianCalendar(year,month,date);timePassed=(newGregorianCalendar()).getTimeInMillis()-dirCreationDate.getTimeInMillis();daysPassed=(int)=timePassed/1000/60/60/24;;propVal1=Integer.toString(daysPassed);}}StringoozieProp=System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);if(oozieProp!=null){FilepropFile=newFile(oozieProp);Propertiesprops=newProperties();props.setProperty(propKey0,propVal0);props.setProperty(propKey1,propVal1);OutputStreamos=newFileOutputStream(propFile);props.store(os,"");os.close();}elsethrownewRuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES+"Systempropertynotdefined");}}
复制代码 代码2:取得目次信息的节点
这个类会取得目次名作为输出的参数,并起首反省该目次是不是存在。假如目次不存在,那末存在工夫(age)和文件数目城市前往-1,不然,这两个值就会前往给子历程。
子历程的下一步是一个switch(决意)声明,它会决意怎样处置目次。假如目次不存在(文件数<0),大概是以后日期(存在工夫<1)而且文件数目少于24(文件数<24),那末子历程就会间接转换到停止形态。假如一切文件都位于子目次中(文件数>23)大概目次是在最少七天前创立的(存在工夫>6),那末就会有以下操纵:
- 利用现存的Map/reduce程序[2]猎取数据
- 目次会备份在数据回档中,然后删除
对action节点的别的设置
猎取举措向你展现了别的一些Oozie设置参数,包含:
- Prepare——假如呈现了prepare参数,就意味着在启举措业(job)之前会删除路径列表。这应当专门用于清算目次。删除操纵会在fs.default.name文件体系中实行。
- Configuration——假如呈现了configuration元素,它个中就会包括针对Map/Reduce功课的JobConf属性。它不但能够用于map/reduce举措,并且还能够用于启动map/reduce功课的java举措。
假如不是以上两种情形,那末子历程就会发送残剩的邮件,然前进出。邮件是作为另外一个java主类完成的(代码3)。- packagecom.navteq.oozie;importjava.util.Properties;importjavax.mail.Message;importjavax.mail.Session;importjavax.mail.Transport;importjavax.mail.internet.InternetAddress;importjavax.mail.internet.MimeMessage;publicclassStandaloneMailer{privatestaticString_mServer="imailchi.navtech.com";privatestaticProperties_props=null;privateStandaloneMailer(){}publicstaticvoidinit(StringmServer){_mServer=mServer;_props=newProperties();_props.setProperty("mail.smtp.host",_mServer);}publicstaticvoidSendMail(Stringsubject,Stringmessage,Stringfrom,Stringto)throwsException{//createsomepropertiesandgetthedefaultSessionSessionsession=Session.getDefaultInstance(_props,null);//createamessageMessagemsg=newMimeMessage(session);//setthefromandtoaddressInternetAddressaddressFrom=newInternetAddress(from);msg.setFrom(addressFrom);String[]recipients=newString[]{to};InternetAddress[]addressTo=newInternetAddress[recipients.length];for(inti=0;i<recipients.length;i++){addressTo[i]=newInternetAddress(recipients[i]);}msg.setRecipients(Message.RecipientType.TO,addressTo);//SettingtheSubjectandContentTypemsg.setSubject(subject);msg.setContent(message,"text/plain");Transport.send(msg);}publicstaticvoidmain(String[]args)throwsException{if(args.length==5){init(_mServer);StringBuildersubject=newStringBuilder();StringBuilderbody=newStringBuilder();subject.append("Directory").append(args[2]).append("contains").append(args[3]).append("files.");body.append("Directory").append(args[2]).append("is").append(args[4]).append("daysoldandcontainsonly").append(args[3]).append("filesinsteadof24.");SendMail(subject.toString(),body.toString(),args[0],args[1]);}elsethrownewException("Invalidnumberofparametersprovidedforemail");}}
复制代码 列表3:发送提示邮件
这是利用了javax.mailAPI的复杂完成,用于发送邮件。
主历程的完成
我们已完成了子历程,然后,对主历程的完成就变得十分复杂了(列表4)[3]。- <workflow-appxmlns=uri:oozie:workflow:0.1name=processDirsWF><startto=getDirs2Process/><!--STEPONE--><actionname=getDirs2Process><!--writes2properties:dir.num-files:returns-1ifdirdoesntexist,otherwisereturns#offilesindirdir.age:returns-1ifdirdoesntexist,otherwisereturnsageofdirindays--><java><job-tracker>${jobTracker}</job-tracker><name-node>${nameNode}</name-node><main-class>com.navteq.oozie.GenerateLookupDirs</main-class><capture-output/></java><okto="forkSubWorkflows"/><errorto="fail"/></action><forkname="forkSubWorkflows"><pathstart="processDir0"/><pathstart="processDir1"/><pathstart="processDir2"/><pathstart="processDir3"/><pathstart="processDir4"/><pathstart="processDir5"/><pathstart="processDir6"/><pathstart="processDir7"/></fork><actionname="processDir0"><sub-workflow><app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path><configuration><property><name>inputDir</name><value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData(getDirs2Process)[dir0]}</value></property><property><name>outputDir</name><value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData(getDirs2Process)[dir0]}</value></property><property><name>jobTracker</name><value>${jobTracker}</value></property><property><name>nameNode</name><value>${nameNode}</value></property><property><name>activeDir</name><value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value></property><property><name>dirName</name><value>${wf:actionData(getDirs2Process)[dir0]}</value></property></configuration></sub-workflow><okto="joining"/><errorto="fail"/></action>….<actionname="processDir7"><sub-workflow><app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path><configuration><property><name>inputDir</name><value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData(getDirs2Process)[dir7]}</value></property><property><name>outputDir</name><value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData(getDirs2Process)[dir7]}</value></property><property><name>dirName</name><value>${wf:actionData(getDirs2Process)[dir7]}</value></property></configuration></sub-workflow><okto="joining"/><errorto="fail"/></action><joinname="joining"to="end"/><killname="fail"><message>Javafailed,errormessage[${wf:errorMessage(wf:lastErrorNode())}]</message></kill><endname=end/></workflow-app>
复制代码 代码4:数据猎取主历程
这个历程起首会触发java节点,盘算必要处置的目次列表(列表5),然后对每一个目次实行子历程,从而处置给定的目次。- packagecom.navteq.oozie;importjava.io.File;importjava.io.FileOutputStream;importjava.io.OutputStream;importjava.util.Calendar;importjava.util.GregorianCalendar;importjava.util.Properties;publicclassGenerateLookupDirs{publicstaticfinallongdayMillis=1000*60*60*24;privatestaticfinalStringOOZIE_ACTION_OUTPUT_PROPERTIES="oozie.action.output.properties";publicstaticvoidmain(String[]args)throwsException{CalendarcurDate=newGregorianCalendar();intyear,month,date;StringpropKey,propVal;StringoozieProp=System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);if(oozieProp!=null){FilepropFile=newFile(oozieProp);Propertiesprops=newProperties();for(inti=0;i<8;++i){year=curDate.get(Calendar.YEAR);month=curDate.get(Calendar.MONTH)+1;date=curDate.get(Calendar.DATE);propKey="dir"+i;propVal=year+"-"+(month<10?"0"+month:month)+"-"+(date<10?"0"+date:date);props.setProperty(propKey,propVal);curDate.setTimeInMillis(curDate.getTimeInMillis()-dayMillis);}OutputStreamos=newFileOutputStream(propFile);props.store(os,"");os.close();}elsethrownewRuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES+"Systempropertynotdefined");}}
复制代码 代码5:目次盘算程序
结论
在这篇文章中,我们向你展现了一个更庞大的完全的事情流示例,它让我们能够演示更多的Oozie特征和对它们的使用。鄙人一篇文章中,我们会会商构建可重用的Oozie组件库,并利用自界说的节点扩大Oozie。
申谢
十分感激我们在Navteq的同事GregoryTitievsky,他为我们完成了年夜部分代码。
关于作者
BorisLublinsky是NAVTEQ公司的首席架构师,在这家公司中他的事情是为年夜型数据办理和处置、SOA和完成各类NAVTEQ的项目界说架构的愿景。他仍是InfoQ的SOA编纂,和OASIS的SOARA事情组的介入者。Boris是一名作者,还常常宣布演讲,他最新的一本书是《AppliedSOA》。
MichaelSegel在已往二十多年间一向与客户写作,辨认并办理他们的营业成绩。Michael已作为多种脚色、在多个行业中事情过。他是一名自力参谋,老是希冀可以办理一切有应战的成绩。Michael具有俄亥俄州立年夜学的软件工程学位。
参考信息
1.BorisLublinsky,MikeSegel《Oozie简介》
[1]目次的称号是汇集这条数据的日期。
[2]这是已存在的程序,对它的形貌与本文有关。
[3]在此省略了一些反复代码。
检察英文原文:OoziebyExample
java是一种面向对象的编程语言,优点是可移植性比较高,最初设计时就是本着一次编写到处执行设计的。可以开发各种应用程序和游戏,不过速度没有c++快,所以一般是不用java来编写应用程序和电脑游戏。 |
|