輕量級分布式RPC框架-編程開發(fā)技術(shù)_第1頁
輕量級分布式RPC框架-編程開發(fā)技術(shù)_第2頁
輕量級分布式RPC框架-編程開發(fā)技術(shù)_第3頁
輕量級分布式RPC框架-編程開發(fā)技術(shù)_第4頁
輕量級分布式RPC框架-編程開發(fā)技術(shù)_第5頁
已閱讀5頁,還剩25頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

1、輕量級分布式rpc框架編程 開發(fā)技術(shù)輕量級分布式rpc框架原文出處:黃勇rpc,即remote procedure cal 1 (遠程過程調(diào)用),說得通俗一點就是:調(diào)用 遠程計算機上的服務(wù),就像調(diào)用本地服務(wù)一樣。rpc可基于http或tcp協(xié)議,web service就是基于http協(xié)議的rpc,它 具有良好的跨平臺性,但其性能卻不如基于tcp協(xié)議的rpco會兩方面會直接 影響rpc的性能,一是傳輸方式,二是序列化。眾所周知,tcp是傳輸層協(xié)議,http是應(yīng)用層協(xié)議,而傳輸層較應(yīng)用層更加底 層,在數(shù)據(jù)傳輸方面,越底層越快,因此,在一般情況f, tcp 一定比http快。 就序列化而言,java

2、捉供了默認的序列化方式,但在高并發(fā)的情況下,這種方 式將會帶來一些性能上的瓶頸,于是市面上出現(xiàn)了一系列優(yōu)秀的序列化框架,比 如:protobuf> kryo> hessian> jackson等,它們可以取代java默認的序列 化,從而提供更高效的性能。為了支持高并發(fā),傳統(tǒng)的阻塞式10顯然不太合適,因此我們需要異步的10, 即nioo java提供了 ni0的解決方案,java 7也提供了更優(yōu)秀的ni0. 2支持, 用java實現(xiàn)ni0并不是遙不可及的事情,只是需要我們熟悉ni0的技術(shù)細 節(jié)。我們需要將服務(wù)部署在分布式壞境下的不同節(jié)點上,通過服務(wù)注冊的方式,讓客 戶端來自動發(fā)

3、現(xiàn)當前可用的服務(wù),并調(diào)用這些服務(wù)。這需要一種服務(wù)注冊表(service registry)的組件,讓它來注冊分布式環(huán)境下所有的服務(wù)地址(包括: 主機名與端口號)。應(yīng)用、服務(wù)、服務(wù)注冊表之間的關(guān)系見下圖:每臺server上可發(fā)布多個service,這些service共用一個host與port, 在分布式環(huán)境下會提供server共同對外提供service。此外,為防止service registry ;1;現(xiàn)單點故障,因此需要將其搭建為集群環(huán)境。木文將為您揭曉開發(fā)輕量級分布式rpc框架的具體過程,該框架基于tcp協(xié) 議,提供了 nto特性,提供高效的序列化方式,同時也具備服務(wù)注冊與發(fā)現(xiàn)的 能力。根

4、據(jù)以上技術(shù)需求,我們可使用如卜技術(shù)選型:1.spring:它是最強大的依賴注入框架,也是業(yè)界的權(quán)威標準。2. netty:它使n10編程更加容易,屏蔽了 java底層的nio細節(jié)。3. protostuff:它基于protobuf序列化舟£架,面向pojo,無需編寫.proto文件。4. zookccpcr:提供服務(wù)注冊與發(fā)現(xiàn)功能,開發(fā)分布式系統(tǒng)的必備選擇,同時它也具備天生的集群能力。1第一步:編寫服務(wù)接口package com.king. zkrpc;/*定義服務(wù)接口*/public interface helloservice string hello(string name);

5、將該接口放在獨立的客戶端jar包屮,以供應(yīng)用使用。2第二步:編寫服務(wù)接口的實現(xiàn)類package com. king, zkrpc;/*實現(xiàn)服務(wù)接口*/rpcservice(helloservice. class) / 指定遠程接口 public class helloservicetmpl implements helloservice 0vcrridepublic string hello(string name) return "hello! " + name;使用rpcservice注解定義在服務(wù)接口的實現(xiàn)類上,需要對該實現(xiàn)類指定遠程接 口,因為實現(xiàn)類可能會實現(xiàn)多個接

6、口,一定要告訴框架哪個才是遠程接口。rpcservice代碼如下:package com. king. zkrpc;import org springframework stcrcotypc. component;import java.lang.annotation. elementtype;import java. 1 ang. armotalion. relention;import java lang, annotation. retent ionpolicy;import java lang annotation. targct;/* rpc接口注解*/target (element

7、type. type)©retention(retentionpolicy. runtime)©component /標明可被spring掃描 public inlerface rpcservice class<?> value ();該注解具備spring的component注解的特性,可被spring掃描。該實現(xiàn)類放在服務(wù)端jar包屮,該jar包還捉供了一些服務(wù)端的配置文件與啟 動服務(wù)的引導(dǎo)程序。3第三步:配置服務(wù)端服務(wù)端spring配置文件名為spring-zk-rpc-server. xml,內(nèi)容如下:<?xml version二"1.

8、0 encoding二utf-8"?><beans xmlns二http:/www. springframework. org/schema/beans/z xmlns: xsi=/http: /www. w3. org/2001/xmlschema-instance/z xmlns:context二http:/www. springframework. org/schema/contextxsi:schcmalocation二http:/www. springframework. org/schcma/bcanshttp:/www. springframework or

9、g/schema/bedns/springbedns-3. 0. xsd http:/www. springframework. org/schema/context http:/www. springframework. org/schema/context/spring-context-3.0.xsd><!配置自動掃包-><context: component-scan base-package二com. king. zkrpc/z/><context:property-placeholderlocation二classpath:rpc-server-c

10、onfig. propertics/><!-配置服務(wù)注冊組件-><bean id二serviceregistry" class二com. king, zkrpe. serviceregistryzz> <constructor-arg name=/zregi stryaddress"value二$registry. address/></bean><!-配置rpc服務(wù)器一><bean id二rpcserver cl ass二com. king, zkrpe. rpcserver><cons

11、true tor-arg n amc=,scrvcraddrcss/,value二$server, address/><cons true tor-arg name=/ serv i cereg i s try/z ref=/zserviceregistry/z/></bean></bcans>具體的配置參數(shù)在rpc-server-config. properties文件屮,內(nèi)容如下: <!- lang: java -># zookeeper 服務(wù)器registry, address二 127. 0. 0. 1:2181# rpc服務(wù)器se

12、rver, address二 127. 0. 0. 1:8000以上配置表明:連接本地的zookeeper服務(wù)器,并在8000端口上發(fā)布rpc服 務(wù)。4第四步:啟動服務(wù)器并發(fā)布服務(wù)為了加載spring配置文件來發(fā)布服務(wù),只需編寫一個引導(dǎo)程序即可:package com. king, zkrpc;importorg. springframcwork. contcxt. support. classpathxmlapplicationcontext;/* rpc服務(wù)啟動入口*/public class rpcbootstrap public static void main(string args

13、) newclasspathxmlapplicationcontext(spring-zkrpc_server. xml);運行rpcbootstrap類的main方法即可啟動服務(wù)端,但還有兩個重要的組件l'曲未 實現(xiàn),它們分別是:serviceregistry與rpcserver,下文會給出具體實現(xiàn)細節(jié)。5第五步:實現(xiàn)服務(wù)注冊使用zookeeper客戶端可輕松實現(xiàn)服務(wù)注冊功能,serviceregistry代碼如下: package com. king, zkrpc;import org.apache, ookeeper*;import org.slf4j. logger;impor

14、t org.slf4j. loggerfactory;import java. io. ioexception;import java. util, concurrent. countdownlatch;/*連接zk注冊中心,創(chuàng)建服務(wù)注冊目錄*/public class serviceregistry private static final logger logger =loggerfactory. getlogger(serviceregistry. class);private countdownlatch latch = new countdownlatch(1);private st

15、ring registryaddress;public serviceregistry(string registryaddress) thi s.regi stryaddress = regi stryaddress;public void register (string data) if (data !二 null) zookeeper zk = connectserver();if (zk != null) createnode(zk, data);private zookeeper connectserver() zookeeper zk = null;try zk = new zo

16、okeeper (regi stryaddress,constant. zk_session_timeout, new watcher() ©overridepublic void process (watchedevent event) /判斷是否已連接zk,連接后計數(shù)器遞減. if (event. getstate()二二event. keeperstate. syncconnected) latch, count dow n(););/若計數(shù)器不為0,則等待. latch, await (); catch (toexception | tnterruptedexception

17、e) logger, error e);return zk;private void crcatenodc(zookccpcr zk, string data) try byte bytes = data. getbytes ();string path 二 zk.create (constant. zk data path, bytes, zoodefs.ids. open_acl_unsafe, createmode. ephemeral_sequenttal);logger, debug(,zcrcatc zookccper node ( => )", path, dat

18、a); catch (keeperexception | interruptedexception e) logger, error e);其中,通過constant配置了所有的常量:package com. king, zkrpc;/* zk相關(guān)常量*/public interface constant int zk_session_timeout = 5000;string zk_registry_path = "/registry"string zk_data_path 二 zk_registry_path + /data;注意:首先需要使用zookccpcr客戶端命

19、令行創(chuàng)建/registry永久節(jié)點,用于 存放所冇的服務(wù)臨時節(jié)點。6第六步:實現(xiàn)rpc服務(wù)器使用netty可實現(xiàn)一個支持ni0的rpc服務(wù)器,需要使用serviceregistry 注冊服務(wù)地址,rpcserver代碼如下:package com. king, zkrpc;import io. netty. bootstrap. serverbootstrap;import ty. channel. channel future;import io. nctty. charincl charincllnitializcr;import ty. channel. channeloption;im

20、port ty.channel. eventloopgroup;import io. netty. channel. nio. nioeventloopgroup;import ty. channel. socket. socketchannel;import io. netty. charmcl socket, nio. nioscrvcrsockctcharmcl;import org.apache, commons. collections4. maputils;import org. slf4j. logger;import org. slfdj. loggerfactory;impo

21、rt org. springframework. beans. beansexception;impor t org. spr in gframework. boa ns. fac to ry. initi alizi ngbcei n; import org.springframework. context. applicationcontext; import org. springframework, context. applicationcontextaware;import java. util. hashmap;import java. util. map;/*啟動并注冊服務(wù)*/

22、public class rpcscrvcr implcments applicationcontcxtawarc, initializingbean private static final logger logger 二loggerfactory. getlogger(rpcserver. class);private string serveraddress;private serviceregistry serviceregistry;private map<string, object> handlermap = new hashmapo () ; / 存放 接口名與服務(wù)

23、對象z間的映射關(guān)系public rpcserver(string serveraddress) this.serveraddress 二 serveraddress;public rpcserver(string serveraddress, serviceregistry serviceregistry) this.serveraddress 二 serveraddress;this.serviceregistry 二 serviceregistry;©overridepublic void setapplicationcontext(applicationcontext ctx)

24、 throws beansexccption map<string, object> servicebeanmap =ctx. getbeanswithannotation (rpcservice. class) ; / 獲取所有帶有 rpcservice 注解的 spring beanif (maputi1s. isnotempty(servicebeanmap) for (object serviccbcan : serviccbcanmap. values () string interfacename =servicebean. getclass (). getannota

25、tion (rpcservice. class). value(). getnam e();handlermap. put(interfacename, servicebean);©overridepublic void aftcrpropcrticssct() throws exccption eventloopgroup bossgroup = new nioeventloopgroupo ; eventloopgroup workergroup 二 new nioeventloopgroupo ; try serverbootstrap bootstrap = new serv

26、erbootstrap(); bootstrap, group(bossgroup,workergroup). channel (nioserversocketchannel. class)childhandler(newchannellnitializersocketchannel>() ©override public void initcharrnc1(sockctcharrncl charmcl) throws exception channel, pipeline() addlast (newrpcdecoder(rpcrequest. class) / 將 rpc

27、請求進行解碼(為了處理請求) addlast (newrpcencoder(rpcresponse. class) / 將 rpc 響應(yīng)進行編碼(為了返回響應(yīng)) addlast (new rpchandler (handlermap) ; / 處理 rpc 請求).option(channeloption. so_backlog, 128).childoption(channeloption. so keepalive, true);str in g array 二 serveraddress. split(/:zz);string host = array0;int port = integ

28、er, parselnt (array1);channelfuture future = bootstrap.bi nd (host, port).sync (); logger, debug ('"server startcd on port port);if (serviceregistry != null) serviceregistry. register (serveraddress) ; / 注冊丿報務(wù) 地址future, channel (). closefuture (). sync (); finally workergroup. shutdowngrace

29、fully (); bossgroup. shutdowngraccfully();以上代碼中,有兩個重要的pojo需要描述一下,它們分別是rpcrequest與 rpcresponsco使用rpcrequest封裝rpc請求,代碼如下:package com. king, zkrpc;/* rpc請求*/public class rpcrequest private string requesttd;private string classname;private string methodname;private class<?> paramctcrtpcs;private o

30、bject parameters;publ ic string £etrequesttdo return rcqucstld;public void setrequestid (string requestld) this, requestld = requestld;public string getclassnamc() return classname;public void setclassname(stri ng classname) this.classname 二 classname;public string getmethodname() return method

31、name;public void setmethodname(string methodname) this. methodname 二 methodname;public class<?> getparametertypes() return parametertypes;public void setparametcrtypcs(class<?> parametcrtypcs) this.parametertypes = parametertypes;public object getparameters() return parameters;public voi

32、d setparameters(object parameters) this. parameters = parameters;使用rpcresponse封裝rpc響應(yīng),代碼如下:package com. king, zkrpe;/* rpc響應(yīng)*/public class rpcresponse private string requestid;private throwablc error;private object result;publ ic st ring get requesttdo return rcqucstld;public void setrequestld(strin

33、g requestld) this.requestld = requestld;public throwab1e geterror() return error;public void seterror(throwable error) this, error = error;public object getresult() return result;public void setresult(object resuit) this, result 二 result;使用 rpcdecoder 提供 rpc 解碼,只需擴展 netty 的 bytetomessagedecoder 抽象類的

34、decode方法即可,代碼如下:package com. king, zkrpe;import ty. buffer. bytebuf;import ty. channel. channelhandlercontext;import ty. handler. codec. bytetomessagedecoder;import java.util. list;/* rpc解碼*/publ ic class rpcdecoder extends bytetomessagedecoder private class<?> genericclass;public rpcdecoder(c

35、lass<?> genericclass) this.genericclass = genericclass;©overridepublic void decode(channelhandlercontext ctx, bytebuf in, list<object> out) tbrows exception if (in. rcadablcbytcs () < 4) return;in. markreaderindex ();int datalength = in. readtnt(); if (datalcngth < 0) ctx. clo

36、se ();if (inreadablebytes() < datalength) in.resetreadertndex();rcturn;byte data = new bytedatalength;in.readbytes(data);object obj 二 scrializationut訂.deserialize (datei, genericclass); out. add (obj);使用 rpcencoder 提供 rpc 編碼,只需擴展 netty 的 messagetobyteencoder 抽象類的encode方法即可,代碼如下:import import impo

37、rtpackage com. king, zkrpe;io. netty. buffer. bytebuf;io. netty. channel. channelhandlercontext;ty.hemdlcr. codec. mcssagctobytcencodcr;/* rpc編碼*/public class rpcencoder extends messagetobyteencoder private class<?> gencricclass;public rpcencoder(class<?> genericclass) this.genericclass

38、二 genericclass;©overridepublic void encode(channelhandlercontext ctx, object in, bytebuf out) throws exception if (genericclass. isinstance(in) bytc data = serializationut訂 serialize(in); out. writelnt(data. length);out. writebytes(data);編寫一個serializationutil工具類,使用protostuff實現(xiàn)序列化:package com. k

39、ing, zkrpe;import com.dyuproject. protostuff. linkedbuffer;import com. dyuproject. protostuff. protostufflout訂;import com. dyuproject. protostuff. schema;import com dyuproject. protostuff, runtime. runtimeschema;import org.objenesis. objenesis;import org.objenesis. objenesisstd;import java.util .map

40、;import java.util.concurrent. concurrenthashmap;/* protostuff序列化與反序列化丄貝*/public class serializationutil private static map<class<?>, schema<?>> cachedschema = new concurrcntilashmapo ();private static objenesis objenesis = new objenesisstd(true);private serializationutil() supprcss

41、warnings(unchecked") private static <t> schema<t> getschema(class<t> cis) schema<t> schema 二(schema<t>) cachedschema. get (cis); if (schema = null) schema = runtimeschema. createfrom(cls);if (schema != null) cachedschema. put(cis, schema);return schema;©suppre

42、sswarnings("unchecked") public static <t> byte serialize(t obj) class<t> cis 二(class<t>) obj. getclass();linkcdbuffcr buffer =linkedbuffer. allocate(linkedbuffer. default_buffer_s1ze);try schema<t> schema 二 getschema(cis);return protostufftouti1. tobytearray(obj, sc

43、hema, buffer); catch (exception e) throw new lllegalstateexception(e. getmessage(), e); finally buffer, clear ();public static <t> t deserialize(byte data, class<t> cis) try t message = (t) objenesis. newtnstance(cls); schcma<t> schema 二 getschema(cis);protostuff10util. mergefrom(d

44、ata, message, schema); return message; catch (exception e) throw new t1 legalstateexception(e.getmessage(), e);以上了使用objenesis來實例化對象,它是比java反射更加強大。 注意:如需要替換其它序列化框架,只需修改serializationuti 1即可。當然, 更好的實現(xiàn)方式是提供配置項來決定使用哪種序列化方式。使用rpchandler中處理rpc請求,只需擴展netty的 simplechannellnboundhandler 抽象類即可,代碼如下:package com

45、. king, zkrpc;import ty. channel. channelfuturelistener;import ty. channel. channelhandlercontext;import ty. channel. simplechanneltnboundhandler;import net.sf. eglib. reflect. fastclass;import net. sf. eglib. reflect. fastmethod;import org. slf4j. logger;import org. slfdj. loggerfactory;import java

46、. util. map;/* rpc服務(wù)端:請求處理過程*/public class rpcllandlcr extends simplcchanncllnboundiiandlcr<rpcrcqucst>private static final logger logger 二 loggerfactory. getlogger(rpchandler. class);private final map<string, object> handlermap;public rpchandler(map<string, object> handlermap) thi

47、 s.handlermap = handlermap;©overridepublic void channelreado(final channelhandlercontext ctx, rpcrequest request) tbrows exception rpcrcsponsc rcsponsc 二 new rpcrcsponsc (); response.setrequestld(request. getrequestld(); try object result 二 handie (request); response. setresult(resuit); catch (

48、throwable t) response. seterror (t);ctx. writeandflush(response). addlistener(channelfuturelistener. close);private object hem die (rpcrcqucst request) throws throwablc string classname = requestgetclasswame ();object servicebean 二 handlermap. get(classname);class<?> serviceclass = servicebean

49、. getclass();string mcthodnamc 二 request, gctmcthodmeimc ();class<?> parametertypes = request. getparametertypes (); object parameters 二 request. getparameters();/ method method = serviceclass. getmethod(methodname, paramctcrtpcs);/ method. setaccessible(true);/ return method, invoke(servicebe

50、an, parameters);fastclass servicefastclass = fastclass. create(serviceclass); fastmcthod scrviccfastmcthod 二servicefastclass. getmethod (methodname, parametertypes);return servicefastmethod. invoke(servicebean, parameters);©overridepublic void exceptioncaught(channelhandlercontext ctx, throwabl

51、e cause)loggereitor("server caught exception,cause); ctx. close();為了避免使用java反射帶來的性能問題,我們可以使用cglib提供的反射 api,如上面用到的 fastclass 與 fastmethodo同樣使用spring配置文件來配置rpc客戶端,spring-zk-rpe-client, xml代 <?xml version二1. 0 encoding二"utf-8?><beans xmlns=/http:/www. /schema/beans

52、" xmlns:xsi=/,http:/www. w3. org/2001 /xmlschema-instance xmlns:context二http:/www. springframework. org/schcma/contcxtz, xsi: schemalocation=/zhttp:/www. springframework, org/schema/beanshttp:/www. springframework. org/schema/beans/spring-beans3.0.xsd http:/www. springframework, org/schema/cont

53、exthttp:/www. springframework. org/schcma/contcxt/spring-contcxt-3 0. xsd ><context :component-scan base-package二com. king. zkrpcz/><contcxt:property-placeholderlocation=/zclasspath:rpc-client-config. properties"/<!-配置服務(wù)發(fā)現(xiàn)組件><bean i d二servi cedi scovery"class二com. king

54、, zkrpe. serviccdiscoveryz,><constructorarg name=,registryaddressz,value=,z$ registry, address /></bean><!配置rpc代理><bean id二rpcproxy" class二com. king, zkrpe. rpcproxy,z><construetor-arg name二servicediscovery"ref=/zservicediscoveryz/></bean></bcans&g

55、t;-其屮 rpc-client-config, properties 提供了具體的配置:<!- lang: java -># zookeeper 服務(wù)器registry, address二 127. 0. 0. 1:21818第八步:實現(xiàn)服務(wù)發(fā)現(xiàn)同樣使用zookeeper實現(xiàn)服務(wù)發(fā)現(xiàn)功能,見如下代碼: package com. king, zkrpe;import org. apache, zookeeper. keeperexception; import org. apache, zookeeper. watchedevent; import org. apache, zoo

56、keeper. watcher; import org. apache, zookeeper. zookeeper;import org.slf4j. logger;import org. slf4j. loggcrfactor);import java. io. ioexception;import java. util. arraylist;import java, uti l.list;import java. util, concurrent. countdownlatch; import java.util.concurrent. threadlocalrandom;/*服務(wù)發(fā)現(xiàn):連

57、接zk,添加weitch事件*/public class servicediscovery private static final logger logger 二 loggerfactory. getlogger(servicediscovery. class);private countdownlatch latch = new countdownlatch(1);private volatile list<string> datalist 二 new arraylisto();private string registryaddrcss;public servicediscovery(string registryaddress) this.registryaddress 二 reg

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論