|
马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
windows系统样,他们做了什么事或者留了一些后门程序,谁都不知道,二,java开发是跨平台,任何系统上都可以运行,对于保密型系统和大型系统开发这是必要的
Worker形式
想办理的成绩
异步实行一些义务,有前往或无前往了局
利用念头
有些时分想实行一些异步义务,如异步收集通讯、daemon义务,但又不想往办理这义务的性命周。这个时分可使用Worker形式,它会帮您办理与实行义务,并能十分便利地猎取了局
布局
良多人大概为以为这与executor很像,但executor是多线程的,它的感化更像是一个计划中央。而Worker则只是个搬运工,它本人自己只要一个线程的。每一个worker有本人的义务处置逻辑,为了完成这个目标,有两种体例
1.创建一个笼统的AbstractWorker,分歧逻辑的worker对其举行分歧的完成;
2.对worker新增一个TaskProcessor分歧的义务传进分歧的processor便可。
第二种体例worker的脚色能够很便利地改动,并且能够随时改换processor,能够了解成可”刷机”的worker
^^。这里我们利用第二种体例来先容此形式的全体布局。
针对上图,具体先容一下几个脚色:
- ConfigurableWorker:望文生义这个就是真正干活的worker了。要完成自我性命周期办理,必要完成Runable,如许其才干以独自的线程运转,必要注重的是work最好以daemon线程的体例运转。worker内里还包含几个别的成员:taskQueue,一个堵塞性子的queue,一样平常BlockingArrayList就能够了,如许义务是FIFO(先辈先出)的,假如要思索义务的优先级,则能够思索利用PriorityBlockingQueue;listeners,依据事务举行分别的事务监听者,以便于当一个义务完成的时分举行处置,必要注重的是,为了较高效地举行listener遍历,这里我保举利用CopyOnWriteArrayList,以免每次都复制。其对应的办法有addlistener、addTask等配套办法,这个都未几说了,更具体的能够看前面的示例代码。
- WorkerTask:实践上这是一个笼统的工内容,其包含基础的id与,task的ID是Worker天生的,相称于递wtte后的一个执回,当数据实行完了的时分必要利用这个id来取了局。尔后面真正完成的实体task则包括义务处置时必要的数据。
- Processor:为了完成可”刷机”的worker,我们将处置逻辑与worker分隔来,processor的本职事情很复杂,只必要加工传进的task数据便可,加工完成后触发fireEvent(WorkerEvent.TASK_COMPLETE)事务,以后经由过程Future的get便可失掉终极的数据。
别的再说一点,关于addTask,能够有一个overload的办法,即在输出task的同时,传进一个RejectPolice,如许能够在size过年夜的时分做出回绝操纵,无效制止被撑逝世。
合用性/成绩
这类计划能主动处置义务,并能依据义务的优先级主动调治义务的实行按次,一个完整自力的thread,你完整能够将其了解成一专门卖力干某种活的”呆板人”。它能够用于处置一些准时、哀求量流动匀称且对及时性请求不是太高的义务,如日记纪录,数据剖析等。固然,假如想进步义务处置的数据,能够天生多个worker,就相称于雇佣更多的人来为你干活,十分直不雅的。固然如许一来,谁来保护这worker便成了一个成绩,别的就今朝这类计划下worker之间是没有通讯与协同的,这些都是改善点。
那末关于多个worker,有甚么构造体例呢?这里我先容三种,算是举一反三:
流水线式worker(assembly-lineworker)
就像临盆车间上的流水线工人一样,将义务切分红几个小块,每一个worker卖力本人的一部分,以进步全体的临盆、产出效力,以下图:
假定完成义务t必要的工夫为:W(t)=n,那末将义务分化成m份,流水线式的实行,每小份必要的工夫便为W(t/m)=n/m,那末实行1000条义务的工夫,单个为1000n,流水线长度为L,则用这类体例所用的工夫为(1000-1)*(m-L+1)*n/m+n
个中L<m,因而可知,流水线的worker越多、义务越细分,事情的效力将越高。这类主体例的成绩在于,假如一个worker呈现成绩,那末全部流水线就将中断事情。并且义务的优先级不克不及静态挪用,必需事前告诉。
多级反应行列(MultilevelFeedbackQueue)
这是一个有Q1、Q2…Qn个多重流水线体例,从高到低分离代码分歧的优先级,高优先级的worker要多于低优先级的,通常为2的倍数,即Q4有16个worker、Q3有8个,前面类推。义务依据事后估量好的优先级进进,假如义务在某步的实行太长,间接踢到下一级,让出最快的资本。以下图所示:
明显这类体例的优点就在于能够静态地调剂义务的优级,实时做出反响。固然,为了完成更好的高度,我们能够在初级里增添一个阀值,使得放偶尔放进初级的task能够有回生的时机^
^。
MapReduce式
流水线固然有必定的并行性,但整体来讲仍旧是串行的,由于只需有一个节点出了成绩,那都是致命的毛病。MapReduce是Google领先完成的一个散布式算法,有十分好的并行实行效力。
如上图所示,只需我们将Map与Reduce都改成Worker就好了,如MapWorker与ReduceWorker。如许,能够瞥见,Map的历程是完整并行的,固然如许就必要在Map与Reduce上的分派与数据组合上稍稍下一点工夫了。
样例完成
这里我们完成一个PageURLMiningWorker,对给定的URL,翻开页面后,接纳一切的URL,并反回了局举行汇总输入。因为工夫无限,这里我只完成了单worker与MapReduceworker集两种体例,有乐趣的同砚能够完成别的范例,如多级反应行列。注重!我这里只是向人人展现这类计划形式,URL
抓取的效力不在本次思索之列。
一切的代码能够在这里猎取:https://github.com/sefler1987/javaworker
单Worker完成样例
- packagecom.alibaba.taobao.main;
- importjava.util.Arrays;
- importjava.util.List;
- importjava.util.concurrent.ConcurrentHashMap;
- importjava.util.concurrent.ConcurrentSkipListSet;
- importjava.util.concurrent.TimeUnit;
- importcom.alibaba.taobao.worker.ConfigurableWorker;
- importcom.alibaba.taobao.worker.SimpleURLComparator;
- importcom.alibaba.taobao.worker.WorkerEvent;
- importcom.alibaba.taobao.worker.WorkerListener;
- importcom.alibaba.taobao.worker.WorkerTask;
- importcom.alibaba.taobao.worker.linear.PageURLMiningProcessor;
- importcom.alibaba.taobao.worker.linear.PageURLMiningTask;
- /**
- *LinearversionofpageURLmining.It’sslowbutsimple.
- *Averagetimecostfor1000URLsis:3800ms
- *
- *@authorxuanyin.zyE-mail:xuanyin.zy@taobao.com
- *@sinceSep16,20125:35:40PM
- */
- publicclassLinearURLMiningMainimplementsWorkerListener{
- privatestaticfinalStringEMPTY_STRING=”";
- privatestaticfinalintURL_SIZE_TO_MINE=10000;
- privatestaticConcurrentHashMap<String,WorkerTask<?>>taskID2TaskMap=newConcurrentHashMap<String,WorkerTask<?>>();
- privatestaticConcurrentSkipListSet<String>foundURLs=newConcurrentSkipListSet<String>(newSimpleURLComparator());
- publicstaticvoidmain(String[]args)throwsInterruptedException{
- longstartTime=System.currentTimeMillis();
- ConfigurableWorkerworker=newConfigurableWorker(“W001″);
- worker.setTaskProcessor(newPageURLMiningProcessor());
- addTask2Worker(worker,newPageURLMiningTask(“http://www.taobao.com”));
- addTask2Worker(worker,newPageURLMiningTask(“http://www.xinhuanet.com”));
- addTask2Worker(worker,newPageURLMiningTask(“http://www.zol.com.cn”));
- addTask2Worker(worker,newPageURLMiningTask(“http://www.ckuyun.com”));
- LinearURLMiningMainmainListener=newLinearURLMiningMain();
- worker.addListener(mainListener);
- worker.start();
- StringtargetURL=EMPTY_STRING;
- while(foundURLs.size()<URL_SIZE_TO_MINE){
- targetURL=foundURLs.pollFirst();
- if(targetURL==null){
- TimeUnit.MILLISECONDS.sleep(50);
- continue;
- }
- PageURLMiningTasktask=newPageURLMiningTask(targetURL);
- taskID2TaskMap.putIfAbsent(worker.addTask(task),task);
- TimeUnit.MILLISECONDS.sleep(100);
- }
- worker.stop();
- for(Stringstring:foundURLs){
- System.out.println(string);
- }
- System.out.println(“TimeCost:”+(System.currentTimeMillis()-startTime)+”ms”);
- }
- privatestaticvoidaddTask2Worker(ConfigurableWorkermapWorker_1,PageURLMiningTasktask){
- StringtaskID=mapWorker_1.addTask(task);
- taskID2TaskMap.put(taskID,task);
- }
- @Override
- publicList<WorkerEvent>intrests(){
- returnArrays.asList(WorkerEvent.TASK_COMPLETE,WorkerEvent.TASK_FAILED);
- }
- @Override
- publicvoidonEvent(WorkerEventevent,Object…args){
- if(WorkerEvent.TASK_FAILED==event){
- System.err.println(“ErrorwhileextractingURLs”);
- return;
- }
- if(WorkerEvent.TASK_COMPLETE!=event)
- return;
- PageURLMiningTasktask=(PageURLMiningTask)args[0];
- if(!taskID2TaskMap.containsKey(task.getTaskID()))
- return;
- foundURLs.addAll(task.getMinedURLs());
- System.out.println(“FoundURLsize:”+foundURLs.size());
- taskID2TaskMap.remove(task.getTaskID());
- }
- }
MapReduce完成样例
- packagecom.alibaba.taobao.main;
- importjava.util.ArrayList;
- importjava.util.Arrays;
- importjava.util.List;
- importjava.util.concurrent.ConcurrentHashMap;
- importjava.util.concurrent.ConcurrentSkipListSet;
- importjava.util.concurrent.TimeUnit;
- importcom.alibaba.taobao.worker.ConfigurableWorker;
- importcom.alibaba.taobao.worker.SimpleURLComparator;
- importcom.alibaba.taobao.worker.WorkerEvent;
- importcom.alibaba.taobao.worker.WorkerListener;
- importcom.alibaba.taobao.worker.WorkerTask;
- importcom.alibaba.taobao.worker.mapreduce.Map2ReduceConnector;
- importcom.alibaba.taobao.worker.mapreduce.MapReducePageURLMiningTask;
- importcom.alibaba.taobao.worker.mapreduce.PageContentFetchProcessor;
- importcom.alibaba.taobao.worker.mapreduce.URLMatchingProcessor;
- /**
- *MapReduceversionofpageURLmining.It’sverypowerful.
- *
- *@authorxuanyin.zyE-mail:xuanyin.zy@taobao.com
- *@sinceSep16,20125:35:40PM
- */
- publicclassMapReduceURLMiningMainimplementsWorkerListener{
- privatestaticfinalStringEMPTY_STRING=”";
- privatestaticfinalintURL_SIZE_TO_MINE=10000;
- privatestaticConcurrentHashMap<String,WorkerTask<?>>taskID2TaskMap=newConcurrentHashMap<String,WorkerTask<?>>();
- privatestaticConcurrentSkipListSet<String>foundURLs=newConcurrentSkipListSet<String>(newSimpleURLComparator());
- publicstaticvoidmain(String[]args)throwsInterruptedException{
- longstartTime=System.currentTimeMillis();
- //fourmapers
- List<ConfigurableWorker>mappers=newArrayList<ConfigurableWorker>(4);
- ConfigurableWorkermapWorker_1=newConfigurableWorker(“W_M1″);
- ConfigurableWorkermapWorker_2=newConfigurableWorker(“W_M2″);
- ConfigurableWorkermapWorker_3=newConfigurableWorker(“W_M3″);
- ConfigurableWorkermapWorker_4=newConfigurableWorker(“W_M4″);
- mapWorker_1.setTaskProcessor(newPageContentFetchProcessor());
- mapWorker_2.setTaskProcessor(newPageContentFetchProcessor());
- mapWorker_3.setTaskProcessor(newPageContentFetchProcessor());
- mapWorker_4.setTaskProcessor(newPageContentFetchProcessor());
- mappers.add(mapWorker_1);
- mappers.add(mapWorker_2);
- mappers.add(mapWorker_3);
- mappers.add(mapWorker_4);
- //onereducer
- ConfigurableWorkerreduceWorker_1=newConfigurableWorker(“W_R1″);
- reduceWorker_1.setTaskProcessor(newURLMatchingProcessor());
- //bindreducertofinalresultclass
- MapReduceURLMiningMainmain=newMapReduceURLMiningMain();
- reduceWorker_1.addListener(main);
- //initiatetasks
- addTask2Worker(mapWorker_1,newMapReducePageURLMiningTask(“http://www.taobao.com”));
- addTask2Worker(mapWorker_2,newMapReducePageURLMiningTask(“http://www.xinhuanet.com”));
- addTask2Worker(mapWorker_3,newMapReducePageURLMiningTask(“http://www.zol.com.cn”));
- addTask2Worker(mapWorker_4,newMapReducePageURLMiningTask(“http://www.ckuyun.com.cn/”));
- //bindmappertoreduer
- Map2ReduceConnectorconnector=newMap2ReduceConnector(Arrays.asList(reduceWorker_1));
- mapWorker_1.addListener(connector);
- mapWorker_2.addListener(connector);
- mapWorker_3.addListener(connector);
- mapWorker_4.addListener(connector);
- //startall
- mapWorker_1.start();
- mapWorker_2.start();
- mapWorker_3.start();
- mapWorker_4.start();
- reduceWorker_1.start();
- StringtargetURL=EMPTY_STRING;
- intlastIndex=0;
- while(foundURLs.size()<URL_SIZE_TO_MINE){
- targetURL=foundURLs.pollFirst();
- if(targetURL==null){
- TimeUnit.MILLISECONDS.sleep(50);
- continue;
- }
- lastIndex=++lastIndex%mappers.size();
- MapReducePageURLMiningTasktask=newMapReducePageURLMiningTask(targetURL);
- taskID2TaskMap.putIfAbsent(mappers.get(lastIndex).addTask(task),task);
- TimeUnit.MILLISECONDS.sleep(100);
- }
- //stopall
- mapWorker_1.stop();
- mapWorker_2.stop();
- mapWorker_3.stop();
- mapWorker_4.stop();
- reduceWorker_1.stop();
- for(Stringstring:foundURLs){
- System.out.println(string);
- }
- System.out.println(“TimeCost:”+(System.currentTimeMillis()-startTime)+”ms”);
- }
- privatestaticvoidaddTask2Worker(ConfigurableWorkermapWorker_1,MapReducePageURLMiningTasktask){
- StringtaskID=mapWorker_1.addTask(task);
- taskID2TaskMap.put(taskID,task);
- }
- @Override
- publicList<WorkerEvent>intrests(){
- returnArrays.asList(WorkerEvent.TASK_COMPLETE,WorkerEvent.TASK_FAILED);
- }
- @Override
- publicvoidonEvent(WorkerEventevent,Object…args){
- if(WorkerEvent.TASK_FAILED==event){
- System.err.println(“ErrorwhileextractingURLs”);
- return;
- }
- if(WorkerEvent.TASK_COMPLETE!=event)
- return;
- MapReducePageURLMiningTasktask=(MapReducePageURLMiningTask)args[0];
- if(!taskID2TaskMap.containsKey(task.getTaskID()))
- return;
- foundURLs.addAll(task.getMinedURLs());
- System.out.println(“FoundURLsize:”+foundURLs.size());
- taskID2TaskMap.remove(task.getTaskID());
- }
- }
了局对照
Y轴为抓取X轴URL个数所用的工夫
总结
我们能够看到,worker形式组合长短常天真的,它真的就像一个活生生的工人,任你分配。利用worker,我们能够更便利地完成更庞大的布局。
net网页编程程序员的大部门代码都靠控件拖拽完成的,虽然java也有,但是无论从美观和速度上都没发和.net网页编程比。java程序员都是代码完成的,所以java程序员常戏称.net网页编程程序员是操作员,呵呵。 |
|