仓酷云

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 491|回复: 13
打印 上一主题 下一主题

[学习教程] JAVA教程之Java Worker 计划形式仓酷云

[复制链接]
乐观 该用户已被删除
跳转到指定楼层
楼主
发表于 2015-1-18 11:24:51 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
windows系统样,他们做了什么事或者留了一些后门程序,谁都不知道,二,java开发是跨平台,任何系统上都可以运行,对于保密型系统和大型系统开发这是必要的
Worker形式

想办理的成绩

异步实行一些义务,有前往或无前往了局
利用念头

有些时分想实行一些异步义务,如异步收集通讯、daemon义务,但又不想往办理这义务的性命周。这个时分可使用Worker形式,它会帮您办理与实行义务,并能十分便利地猎取了局
布局

良多人大概为以为这与executor很像,但executor是多线程的,它的感化更像是一个计划中央。而Worker则只是个搬运工,它本人自己只要一个线程的。每一个worker有本人的义务处置逻辑,为了完成这个目标,有两种体例
1.创建一个笼统的AbstractWorker,分歧逻辑的worker对其举行分歧的完成;
2.对worker新增一个TaskProcessor分歧的义务传进分歧的processor便可。
第二种体例worker的脚色能够很便利地改动,并且能够随时改换processor,能够了解成可”刷机”的worker
^^。这里我们利用第二种体例来先容此形式的全体布局。
针对上图,具体先容一下几个脚色:


  • ConfigurableWorker:望文生义这个就是真正干活的worker了。要完成自我性命周期办理,必要完成Runable,如许其才干以独自的线程运转,必要注重的是work最好以daemon线程的体例运转。worker内里还包含几个别的成员:taskQueue,一个堵塞性子的queue,一样平常BlockingArrayList就能够了,如许义务是FIFO(先辈先出)的,假如要思索义务的优先级,则能够思索利用PriorityBlockingQueue;listeners,依据事务举行分别的事务监听者,以便于当一个义务完成的时分举行处置,必要注重的是,为了较高效地举行listener遍历,这里我保举利用CopyOnWriteArrayList,以免每次都复制。其对应的办法有addlistener、addTask等配套办法,这个都未几说了,更具体的能够看前面的示例代码。
  • WorkerTask:实践上这是一个笼统的工内容,其包含基础的id与,task的ID是Worker天生的,相称于递wtte后的一个执回,当数据实行完了的时分必要利用这个id来取了局。尔后面真正完成的实体task则包括义务处置时必要的数据。
  • Processor:为了完成可”刷机”的worker,我们将处置逻辑与worker分隔来,processor的本职事情很复杂,只必要加工传进的task数据便可,加工完成后触发fireEvent(WorkerEvent.TASK_COMPLETE)事务,以后经由过程Future的get便可失掉终极的数据。
别的再说一点,关于addTask,能够有一个overload的办法,即在输出task的同时,传进一个RejectPolice,如许能够在size过年夜的时分做出回绝操纵,无效制止被撑逝世。
合用性/成绩

这类计划能主动处置义务,并能依据义务的优先级主动调治义务的实行按次,一个完整自力的thread,你完整能够将其了解成一专门卖力干某种活的”呆板人”。它能够用于处置一些准时、哀求量流动匀称且对及时性请求不是太高的义务,如日记纪录,数据剖析等。固然,假如想进步义务处置的数据,能够天生多个worker,就相称于雇佣更多的人来为你干活,十分直不雅的。固然如许一来,谁来保护这worker便成了一个成绩,别的就今朝这类计划下worker之间是没有通讯与协同的,这些都是改善点。
那末关于多个worker,有甚么构造体例呢?这里我先容三种,算是举一反三:
流水线式worker(assembly-lineworker)

就像临盆车间上的流水线工人一样,将义务切分红几个小块,每一个worker卖力本人的一部分,以进步全体的临盆、产出效力,以下图:

假定完成义务t必要的工夫为:W(t)=n,那末将义务分化成m份,流水线式的实行,每小份必要的工夫便为W(t/m)=n/m,那末实行1000条义务的工夫,单个为1000n,流水线长度为L,则用这类体例所用的工夫为(1000-1)*(m-L+1)*n/m+n
个中L<m,因而可知,流水线的worker越多、义务越细分,事情的效力将越高。这类主体例的成绩在于,假如一个worker呈现成绩,那末全部流水线就将中断事情。并且义务的优先级不克不及静态挪用,必需事前告诉。
多级反应行列(MultilevelFeedbackQueue)

这是一个有Q1、Q2…Qn个多重流水线体例,从高到低分离代码分歧的优先级,高优先级的worker要多于低优先级的,通常为2的倍数,即Q4有16个worker、Q3有8个,前面类推。义务依据事后估量好的优先级进进,假如义务在某步的实行太长,间接踢到下一级,让出最快的资本。以下图所示:

明显这类体例的优点就在于能够静态地调剂义务的优级,实时做出反响。固然,为了完成更好的高度,我们能够在初级里增添一个阀值,使得放偶尔放进初级的task能够有回生的时机^
^。
MapReduce式

流水线固然有必定的并行性,但整体来讲仍旧是串行的,由于只需有一个节点出了成绩,那都是致命的毛病。MapReduce是Google领先完成的一个散布式算法,有十分好的并行实行效力。

如上图所示,只需我们将Map与Reduce都改成Worker就好了,如MapWorker与ReduceWorker。如许,能够瞥见,Map的历程是完整并行的,固然如许就必要在Map与Reduce上的分派与数据组合上稍稍下一点工夫了。
样例完成

这里我们完成一个PageURLMiningWorker,对给定的URL,翻开页面后,接纳一切的URL,并反回了局举行汇总输入。因为工夫无限,这里我只完成了单worker与MapReduceworker集两种体例,有乐趣的同砚能够完成别的范例,如多级反应行列。注重!我这里只是向人人展现这类计划形式,URL
抓取的效力不在本次思索之列。
一切的代码能够在这里猎取:https://github.com/sefler1987/javaworker
单Worker完成样例



  • packagecom.alibaba.taobao.main;

  • importjava.util.Arrays;
  • importjava.util.List;
  • importjava.util.concurrent.ConcurrentHashMap;
  • importjava.util.concurrent.ConcurrentSkipListSet;
  • importjava.util.concurrent.TimeUnit;

  • importcom.alibaba.taobao.worker.ConfigurableWorker;
  • importcom.alibaba.taobao.worker.SimpleURLComparator;
  • importcom.alibaba.taobao.worker.WorkerEvent;
  • importcom.alibaba.taobao.worker.WorkerListener;
  • importcom.alibaba.taobao.worker.WorkerTask;
  • importcom.alibaba.taobao.worker.linear.PageURLMiningProcessor;
  • importcom.alibaba.taobao.worker.linear.PageURLMiningTask;

  • /**
  • *LinearversionofpageURLmining.It’sslowbutsimple.
  • *Averagetimecostfor1000URLsis:3800ms
  • *
  • *@authorxuanyin.zyE-mail:xuanyin.zy@taobao.com
  • *@sinceSep16,20125:35:40PM
  • */
  • publicclassLinearURLMiningMainimplementsWorkerListener{
  • privatestaticfinalStringEMPTY_STRING=”";

  • privatestaticfinalintURL_SIZE_TO_MINE=10000;

  • privatestaticConcurrentHashMap<String,WorkerTask<?>>taskID2TaskMap=newConcurrentHashMap<String,WorkerTask<?>>();

  • privatestaticConcurrentSkipListSet<String>foundURLs=newConcurrentSkipListSet<String>(newSimpleURLComparator());

  • publicstaticvoidmain(String[]args)throwsInterruptedException{
  • longstartTime=System.currentTimeMillis();

  • ConfigurableWorkerworker=newConfigurableWorker(“W001&Prime;);
  • worker.setTaskProcessor(newPageURLMiningProcessor());

  • addTask2Worker(worker,newPageURLMiningTask(“http://www.taobao.com”));
  • addTask2Worker(worker,newPageURLMiningTask(“http://www.xinhuanet.com”));
  • addTask2Worker(worker,newPageURLMiningTask(“http://www.zol.com.cn”));
  • addTask2Worker(worker,newPageURLMiningTask(“http://www.ckuyun.com”));

  • LinearURLMiningMainmainListener=newLinearURLMiningMain();
  • worker.addListener(mainListener);

  • worker.start();

  • StringtargetURL=EMPTY_STRING;
  • while(foundURLs.size()<URL_SIZE_TO_MINE){
  • targetURL=foundURLs.pollFirst();

  • if(targetURL==null){
  • TimeUnit.MILLISECONDS.sleep(50);
  • continue;
  • }

  • PageURLMiningTasktask=newPageURLMiningTask(targetURL);
  • taskID2TaskMap.putIfAbsent(worker.addTask(task),task);

  • TimeUnit.MILLISECONDS.sleep(100);
  • }

  • worker.stop();

  • for(Stringstring:foundURLs){
  • System.out.println(string);
  • }

  • System.out.println(“TimeCost:”+(System.currentTimeMillis()-startTime)+”ms”);
  • }

  • privatestaticvoidaddTask2Worker(ConfigurableWorkermapWorker_1,PageURLMiningTasktask){
  • StringtaskID=mapWorker_1.addTask(task);
  • taskID2TaskMap.put(taskID,task);
  • }

  • @Override
  • publicList<WorkerEvent>intrests(){
  • returnArrays.asList(WorkerEvent.TASK_COMPLETE,WorkerEvent.TASK_FAILED);
  • }

  • @Override
  • publicvoidonEvent(WorkerEventevent,Object…args){
  • if(WorkerEvent.TASK_FAILED==event){
  • System.err.println(“ErrorwhileextractingURLs”);
  • return;
  • }

  • if(WorkerEvent.TASK_COMPLETE!=event)
  • return;

  • PageURLMiningTasktask=(PageURLMiningTask)args[0];
  • if(!taskID2TaskMap.containsKey(task.getTaskID()))
  • return;

  • foundURLs.addAll(task.getMinedURLs());

  • System.out.println(“FoundURLsize:”+foundURLs.size());

  • taskID2TaskMap.remove(task.getTaskID());
  • }
  • }

MapReduce完成样例



  • packagecom.alibaba.taobao.main;

  • importjava.util.ArrayList;
  • importjava.util.Arrays;
  • importjava.util.List;
  • importjava.util.concurrent.ConcurrentHashMap;
  • importjava.util.concurrent.ConcurrentSkipListSet;
  • importjava.util.concurrent.TimeUnit;

  • importcom.alibaba.taobao.worker.ConfigurableWorker;
  • importcom.alibaba.taobao.worker.SimpleURLComparator;
  • importcom.alibaba.taobao.worker.WorkerEvent;
  • importcom.alibaba.taobao.worker.WorkerListener;
  • importcom.alibaba.taobao.worker.WorkerTask;
  • importcom.alibaba.taobao.worker.mapreduce.Map2ReduceConnector;
  • importcom.alibaba.taobao.worker.mapreduce.MapReducePageURLMiningTask;
  • importcom.alibaba.taobao.worker.mapreduce.PageContentFetchProcessor;
  • importcom.alibaba.taobao.worker.mapreduce.URLMatchingProcessor;

  • /**
  • *MapReduceversionofpageURLmining.It’sverypowerful.
  • *
  • *@authorxuanyin.zyE-mail:xuanyin.zy@taobao.com
  • *@sinceSep16,20125:35:40PM
  • */
  • publicclassMapReduceURLMiningMainimplementsWorkerListener{
  • privatestaticfinalStringEMPTY_STRING=”";

  • privatestaticfinalintURL_SIZE_TO_MINE=10000;

  • privatestaticConcurrentHashMap<String,WorkerTask<?>>taskID2TaskMap=newConcurrentHashMap<String,WorkerTask<?>>();

  • privatestaticConcurrentSkipListSet<String>foundURLs=newConcurrentSkipListSet<String>(newSimpleURLComparator());

  • publicstaticvoidmain(String[]args)throwsInterruptedException{
  • longstartTime=System.currentTimeMillis();

  • //fourmapers
  • List<ConfigurableWorker>mappers=newArrayList<ConfigurableWorker>(4);

  • ConfigurableWorkermapWorker_1=newConfigurableWorker(“W_M1&Prime;);
  • ConfigurableWorkermapWorker_2=newConfigurableWorker(“W_M2&Prime;);
  • ConfigurableWorkermapWorker_3=newConfigurableWorker(“W_M3&Prime;);
  • ConfigurableWorkermapWorker_4=newConfigurableWorker(“W_M4&Prime;);
  • mapWorker_1.setTaskProcessor(newPageContentFetchProcessor());
  • mapWorker_2.setTaskProcessor(newPageContentFetchProcessor());
  • mapWorker_3.setTaskProcessor(newPageContentFetchProcessor());
  • mapWorker_4.setTaskProcessor(newPageContentFetchProcessor());

  • mappers.add(mapWorker_1);
  • mappers.add(mapWorker_2);
  • mappers.add(mapWorker_3);
  • mappers.add(mapWorker_4);

  • //onereducer
  • ConfigurableWorkerreduceWorker_1=newConfigurableWorker(“W_R1&Prime;);
  • reduceWorker_1.setTaskProcessor(newURLMatchingProcessor());

  • //bindreducertofinalresultclass
  • MapReduceURLMiningMainmain=newMapReduceURLMiningMain();
  • reduceWorker_1.addListener(main);

  • //initiatetasks
  • addTask2Worker(mapWorker_1,newMapReducePageURLMiningTask(“http://www.taobao.com”));
  • addTask2Worker(mapWorker_2,newMapReducePageURLMiningTask(“http://www.xinhuanet.com”));
  • addTask2Worker(mapWorker_3,newMapReducePageURLMiningTask(“http://www.zol.com.cn”));
  • addTask2Worker(mapWorker_4,newMapReducePageURLMiningTask(“http://www.ckuyun.com.cn/”));

  • //bindmappertoreduer
  • Map2ReduceConnectorconnector=newMap2ReduceConnector(Arrays.asList(reduceWorker_1));
  • mapWorker_1.addListener(connector);
  • mapWorker_2.addListener(connector);
  • mapWorker_3.addListener(connector);
  • mapWorker_4.addListener(connector);

  • //startall
  • mapWorker_1.start();
  • mapWorker_2.start();
  • mapWorker_3.start();
  • mapWorker_4.start();
  • reduceWorker_1.start();

  • StringtargetURL=EMPTY_STRING;
  • intlastIndex=0;
  • while(foundURLs.size()<URL_SIZE_TO_MINE){
  • targetURL=foundURLs.pollFirst();

  • if(targetURL==null){
  • TimeUnit.MILLISECONDS.sleep(50);
  • continue;
  • }

  • lastIndex=++lastIndex%mappers.size();
  • MapReducePageURLMiningTasktask=newMapReducePageURLMiningTask(targetURL);
  • taskID2TaskMap.putIfAbsent(mappers.get(lastIndex).addTask(task),task);

  • TimeUnit.MILLISECONDS.sleep(100);
  • }

  • //stopall
  • mapWorker_1.stop();
  • mapWorker_2.stop();
  • mapWorker_3.stop();
  • mapWorker_4.stop();
  • reduceWorker_1.stop();

  • for(Stringstring:foundURLs){
  • System.out.println(string);
  • }

  • System.out.println(“TimeCost:”+(System.currentTimeMillis()-startTime)+”ms”);
  • }

  • privatestaticvoidaddTask2Worker(ConfigurableWorkermapWorker_1,MapReducePageURLMiningTasktask){
  • StringtaskID=mapWorker_1.addTask(task);
  • taskID2TaskMap.put(taskID,task);
  • }

  • @Override
  • publicList<WorkerEvent>intrests(){
  • returnArrays.asList(WorkerEvent.TASK_COMPLETE,WorkerEvent.TASK_FAILED);
  • }

  • @Override
  • publicvoidonEvent(WorkerEventevent,Object…args){
  • if(WorkerEvent.TASK_FAILED==event){
  • System.err.println(“ErrorwhileextractingURLs”);
  • return;
  • }

  • if(WorkerEvent.TASK_COMPLETE!=event)
  • return;

  • MapReducePageURLMiningTasktask=(MapReducePageURLMiningTask)args[0];
  • if(!taskID2TaskMap.containsKey(task.getTaskID()))
  • return;

  • foundURLs.addAll(task.getMinedURLs());

  • System.out.println(“FoundURLsize:”+foundURLs.size());

  • taskID2TaskMap.remove(task.getTaskID());
  • }
  • }

了局对照

Y轴为抓取X轴URL个数所用的工夫


总结

我们能够看到,worker形式组合长短常天真的,它真的就像一个活生生的工人,任你分配。利用worker,我们能够更便利地完成更庞大的布局。

net网页编程程序员的大部门代码都靠控件拖拽完成的,虽然java也有,但是无论从美观和速度上都没发和.net网页编程比。java程序员都是代码完成的,所以java程序员常戏称.net网页编程程序员是操作员,呵呵。
小妖女 该用户已被删除
沙发
发表于 2015-1-20 17:29:48 | 只看该作者
有时间再研究一下MVC结构(把Model-View-Control分离开的设计思想)
海妖 该用户已被删除
板凳
发表于 2015-1-25 11:15:26 | 只看该作者
Pet Store.(宠物店)是SUN公司为了演示其J2EE编程规范而推出的开放源码的程序,应该很具有权威性,想学J2EE和EJB的朋友不要 错过了。
灵魂腐蚀 该用户已被删除
地板
发表于 2015-2-2 21:49:28 | 只看该作者
一直感觉JAVA很大,很杂,找不到学习方向,前两天在网上找到了这篇文章,感觉不错,给没有方向的我指了一个方向,先不管对不对,做下来再说。
谁可相欹 该用户已被删除
5#
发表于 2015-2-2 22:03:30 | 只看该作者
是一种语言,用以产生「小应用程序(Applet(s))
山那边是海 该用户已被删除
6#
发表于 2015-2-5 18:52:22 | 只看该作者
吧,现在很流行的Structs就是它的一种实现方式,不过Structs用起来实在是很繁,我们只要学习其精髓即可,我们完全可以设计自己的MVC结构。然后你再研究一下软件Refactoring (重构)和极限XP编程,相信你又会上一个台阶。 做完这些,你不如整理一下你的Java代码,把那些经典的程序和常见的应用整理出来,再精心打造一番,提高其重用性和可扩展性。你再找几个志同道合的朋友成立一个工作室吧
小女巫 该用户已被删除
7#
发表于 2015-2-13 04:51:41 | 只看该作者
另外编写和运行Java程序需要JDK(包括JRE),在sun的官方网站上有下载,thinking in java第三版用的JDK版本是1.4,现在流行的版本1.5(sun称作J2SE 5.0,汗),不过听说Bruce的TIJ第四版国外已经出来了,是专门为J2SE 5.0而写的。
飘飘悠悠 该用户已被删除
8#
发表于 2015-2-20 18:02:50 | 只看该作者
学Java必读的两个开源程序就是Jive和Pet Store.。 Jive是国外一个非常著名的BBS程序,完全开放源码。论坛的设计采用了很多先进的技术,如Cache、用户认证、Filter、XML等,而且论坛完全屏蔽了对数据库的访问,可以很轻易的在不同数据库中移植。论坛还有方便的安装和管理程序,这是我们平时编程时容易忽略的一部份(中国程序员一般只注重编程的技术含量,却完全不考虑用户的感受,这就是我们与国外软件的差距所在)。
兰色精灵 该用户已被删除
9#
发表于 2015-2-21 06:40:01 | 只看该作者
另外编写和运行Java程序需要JDK(包括JRE),在sun的官方网站上有下载,thinking in java第三版用的JDK版本是1.4,现在流行的版本1.5(sun称作J2SE 5.0,汗),不过听说Bruce的TIJ第四版国外已经出来了,是专门为J2SE 5.0而写的。
再现理想 该用户已被删除
10#
发表于 2015-3-5 19:31:01 | 只看该作者
Pet Store.(宠物店)是SUN公司为了演示其J2EE编程规范而推出的开放源码的程序,应该很具有权威性,想学J2EE和EJB的朋友不要 错过了。
老尸 该用户已被删除
11#
发表于 2015-3-6 07:17:27 | 只看该作者
其实说这种话的人就如当年小日本号称“三个月拿下中国”一样大言不惭。不是Tomjava泼你冷水,你现在只是学到了Java的骨架,却还没有学到Java的精髓。接下来你得研究设计模式了。
精灵巫婆 该用户已被删除
12#
发表于 2015-3-8 15:12:01 | 只看该作者
是一种将安全性(Security)列为第一优先考虑的语言
蒙在股里 该用户已被删除
13#
发表于 2015-3-16 02:42:32 | 只看该作者
你可以去承接一些项目做了,一开始可能有些困难,可是你有技术积累,又考虑周全,接下项目来可以迅速作完,相信大家以后都会来找你的,所以Money就哗啦啦的。。。。。。
第二个灵魂 该用户已被删除
14#
发表于 2015-3-22 18:52:34 | 只看该作者
Jive的资料在很多网站上都有,大家可以找来研究一下。相信你读完代码后,会有脱胎换骨的感觉。遗憾的是Jive从2.5以后就不再无条件的开放源代码,同时有licence限制。不过幸好还有中国一流的Java程序员关注它,外国人不开源了,中国人就不能开源吗?这里向大家推荐一个汉化的Jive版本—J道。Jive(J道版)是由中国Java界大名 鼎鼎的banq在Jive 2.1版本基础上改编而成, 全中文,增加了一些实用功能,如贴图,用户头像和用户资料查询等,而且有一个开发团队在不断升级。你可以访问banq的网站
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|仓酷云 鄂ICP备14007578号-2

GMT+8, 2024-12-24 03:48

Powered by Discuz! X3.2

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表