版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
1、輕量級分布式rpc框架編程 開發(fā)技術(shù)輕量級分布式rpc框架原文出處:黃勇rpc,即remote procedure cal 1 (遠程過程調(diào)用),說得通俗一點就是:調(diào)用 遠程計算機上的服務(wù),就像調(diào)用本地服務(wù)一樣。rpc可基于http或tcp協(xié)議,web service就是基于http協(xié)議的rpc,它 具有良好的跨平臺性,但其性能卻不如基于tcp協(xié)議的rpco會兩方面會直接 影響rpc的性能,一是傳輸方式,二是序列化。眾所周知,tcp是傳輸層協(xié)議,http是應(yīng)用層協(xié)議,而傳輸層較應(yīng)用層更加底 層,在數(shù)據(jù)傳輸方面,越底層越快,因此,在一般情況f, tcp 一定比http快。 就序列化而言,java
2、捉供了默認的序列化方式,但在高并發(fā)的情況下,這種方 式將會帶來一些性能上的瓶頸,于是市面上出現(xiàn)了一系列優(yōu)秀的序列化框架,比 如:protobuf> kryo> hessian> jackson等,它們可以取代java默認的序列 化,從而提供更高效的性能。為了支持高并發(fā),傳統(tǒng)的阻塞式10顯然不太合適,因此我們需要異步的10, 即nioo java提供了 ni0的解決方案,java 7也提供了更優(yōu)秀的ni0. 2支持, 用java實現(xiàn)ni0并不是遙不可及的事情,只是需要我們熟悉ni0的技術(shù)細 節(jié)。我們需要將服務(wù)部署在分布式壞境下的不同節(jié)點上,通過服務(wù)注冊的方式,讓客 戶端來自動發(fā)
3、現(xiàn)當前可用的服務(wù),并調(diào)用這些服務(wù)。這需要一種服務(wù)注冊表(service registry)的組件,讓它來注冊分布式環(huán)境下所有的服務(wù)地址(包括: 主機名與端口號)。應(yīng)用、服務(wù)、服務(wù)注冊表之間的關(guān)系見下圖:每臺server上可發(fā)布多個service,這些service共用一個host與port, 在分布式環(huán)境下會提供server共同對外提供service。此外,為防止service registry ;1;現(xiàn)單點故障,因此需要將其搭建為集群環(huán)境。木文將為您揭曉開發(fā)輕量級分布式rpc框架的具體過程,該框架基于tcp協(xié) 議,提供了 nto特性,提供高效的序列化方式,同時也具備服務(wù)注冊與發(fā)現(xiàn)的 能力。根
4、據(jù)以上技術(shù)需求,我們可使用如卜技術(shù)選型:1.spring:它是最強大的依賴注入框架,也是業(yè)界的權(quán)威標準。2. netty:它使n10編程更加容易,屏蔽了 java底層的nio細節(jié)。3. protostuff:它基于protobuf序列化舟£架,面向pojo,無需編寫.proto文件。4. zookccpcr:提供服務(wù)注冊與發(fā)現(xiàn)功能,開發(fā)分布式系統(tǒng)的必備選擇,同時它也具備天生的集群能力。1第一步:編寫服務(wù)接口package com.king. zkrpc;/*定義服務(wù)接口*/public interface helloservice string hello(string name);
5、將該接口放在獨立的客戶端jar包屮,以供應(yīng)用使用。2第二步:編寫服務(wù)接口的實現(xiàn)類package com. king, zkrpc;/*實現(xiàn)服務(wù)接口*/rpcservice(helloservice. class) / 指定遠程接口 public class helloservicetmpl implements helloservice 0vcrridepublic string hello(string name) return "hello! " + name;使用rpcservice注解定義在服務(wù)接口的實現(xiàn)類上,需要對該實現(xiàn)類指定遠程接 口,因為實現(xiàn)類可能會實現(xiàn)多個接
6、口,一定要告訴框架哪個才是遠程接口。rpcservice代碼如下:package com. king. zkrpc;import org springframework stcrcotypc. component;import java.lang.annotation. elementtype;import java. 1 ang. armotalion. relention;import java lang, annotation. retent ionpolicy;import java lang annotation. targct;/* rpc接口注解*/target (element
7、type. type)©retention(retentionpolicy. runtime)©component /標明可被spring掃描 public inlerface rpcservice class<?> value ();該注解具備spring的component注解的特性,可被spring掃描。該實現(xiàn)類放在服務(wù)端jar包屮,該jar包還捉供了一些服務(wù)端的配置文件與啟 動服務(wù)的引導(dǎo)程序。3第三步:配置服務(wù)端服務(wù)端spring配置文件名為spring-zk-rpc-server. xml,內(nèi)容如下:<?xml version二"1.
8、0 encoding二utf-8"?><beans xmlns二http:/www. springframework. org/schema/beans/z xmlns: xsi=/http: /www. w3. org/2001/xmlschema-instance/z xmlns:context二http:/www. springframework. org/schema/contextxsi:schcmalocation二http:/www. springframework. org/schcma/bcanshttp:/www. springframework or
9、g/schema/bedns/springbedns-3. 0. xsd http:/www. springframework. org/schema/context http:/www. springframework. org/schema/context/spring-context-3.0.xsd><!配置自動掃包-><context: component-scan base-package二com. king. zkrpc/z/><context:property-placeholderlocation二classpath:rpc-server-c
10、onfig. propertics/><!-配置服務(wù)注冊組件-><bean id二serviceregistry" class二com. king, zkrpe. serviceregistryzz> <constructor-arg name=/zregi stryaddress"value二$registry. address/></bean><!-配置rpc服務(wù)器一><bean id二rpcserver cl ass二com. king, zkrpe. rpcserver><cons
11、true tor-arg n amc=,scrvcraddrcss/,value二$server, address/><cons true tor-arg name=/ serv i cereg i s try/z ref=/zserviceregistry/z/></bean></bcans>具體的配置參數(shù)在rpc-server-config. properties文件屮,內(nèi)容如下: <!- lang: java -># zookeeper 服務(wù)器registry, address二 127. 0. 0. 1:2181# rpc服務(wù)器se
12、rver, address二 127. 0. 0. 1:8000以上配置表明:連接本地的zookeeper服務(wù)器,并在8000端口上發(fā)布rpc服 務(wù)。4第四步:啟動服務(wù)器并發(fā)布服務(wù)為了加載spring配置文件來發(fā)布服務(wù),只需編寫一個引導(dǎo)程序即可:package com. king, zkrpc;importorg. springframcwork. contcxt. support. classpathxmlapplicationcontext;/* rpc服務(wù)啟動入口*/public class rpcbootstrap public static void main(string args
13、) newclasspathxmlapplicationcontext(spring-zkrpc_server. xml);運行rpcbootstrap類的main方法即可啟動服務(wù)端,但還有兩個重要的組件l'曲未 實現(xiàn),它們分別是:serviceregistry與rpcserver,下文會給出具體實現(xiàn)細節(jié)。5第五步:實現(xiàn)服務(wù)注冊使用zookeeper客戶端可輕松實現(xiàn)服務(wù)注冊功能,serviceregistry代碼如下: package com. king, zkrpc;import org.apache, ookeeper*;import org.slf4j. logger;impor
14、t org.slf4j. loggerfactory;import java. io. ioexception;import java. util, concurrent. countdownlatch;/*連接zk注冊中心,創(chuàng)建服務(wù)注冊目錄*/public class serviceregistry private static final logger logger =loggerfactory. getlogger(serviceregistry. class);private countdownlatch latch = new countdownlatch(1);private st
15、ring registryaddress;public serviceregistry(string registryaddress) thi s.regi stryaddress = regi stryaddress;public void register (string data) if (data !二 null) zookeeper zk = connectserver();if (zk != null) createnode(zk, data);private zookeeper connectserver() zookeeper zk = null;try zk = new zo
16、okeeper (regi stryaddress,constant. zk_session_timeout, new watcher() ©overridepublic void process (watchedevent event) /判斷是否已連接zk,連接后計數(shù)器遞減. if (event. getstate()二二event. keeperstate. syncconnected) latch, count dow n(););/若計數(shù)器不為0,則等待. latch, await (); catch (toexception | tnterruptedexception
17、e) logger, error e);return zk;private void crcatenodc(zookccpcr zk, string data) try byte bytes = data. getbytes ();string path 二 zk.create (constant. zk data path, bytes, zoodefs.ids. open_acl_unsafe, createmode. ephemeral_sequenttal);logger, debug(,zcrcatc zookccper node ( => )", path, dat
18、a); catch (keeperexception | interruptedexception e) logger, error e);其中,通過constant配置了所有的常量:package com. king, zkrpc;/* zk相關(guān)常量*/public interface constant int zk_session_timeout = 5000;string zk_registry_path = "/registry"string zk_data_path 二 zk_registry_path + /data;注意:首先需要使用zookccpcr客戶端命
19、令行創(chuàng)建/registry永久節(jié)點,用于 存放所冇的服務(wù)臨時節(jié)點。6第六步:實現(xiàn)rpc服務(wù)器使用netty可實現(xiàn)一個支持ni0的rpc服務(wù)器,需要使用serviceregistry 注冊服務(wù)地址,rpcserver代碼如下:package com. king, zkrpc;import io. netty. bootstrap. serverbootstrap;import ty. channel. channel future;import io. nctty. charincl charincllnitializcr;import ty. channel. channeloption;im
20、port ty.channel. eventloopgroup;import io. netty. channel. nio. nioeventloopgroup;import ty. channel. socket. socketchannel;import io. netty. charmcl socket, nio. nioscrvcrsockctcharmcl;import org.apache, commons. collections4. maputils;import org. slf4j. logger;import org. slfdj. loggerfactory;impo
21、rt org. springframework. beans. beansexception;impor t org. spr in gframework. boa ns. fac to ry. initi alizi ngbcei n; import org.springframework. context. applicationcontext; import org. springframework, context. applicationcontextaware;import java. util. hashmap;import java. util. map;/*啟動并注冊服務(wù)*/
22、public class rpcscrvcr implcments applicationcontcxtawarc, initializingbean private static final logger logger 二loggerfactory. getlogger(rpcserver. class);private string serveraddress;private serviceregistry serviceregistry;private map<string, object> handlermap = new hashmapo () ; / 存放 接口名與服務(wù)
23、對象z間的映射關(guān)系public rpcserver(string serveraddress) this.serveraddress 二 serveraddress;public rpcserver(string serveraddress, serviceregistry serviceregistry) this.serveraddress 二 serveraddress;this.serviceregistry 二 serviceregistry;©overridepublic void setapplicationcontext(applicationcontext ctx)
24、 throws beansexccption map<string, object> servicebeanmap =ctx. getbeanswithannotation (rpcservice. class) ; / 獲取所有帶有 rpcservice 注解的 spring beanif (maputi1s. isnotempty(servicebeanmap) for (object serviccbcan : serviccbcanmap. values () string interfacename =servicebean. getclass (). getannota
25、tion (rpcservice. class). value(). getnam e();handlermap. put(interfacename, servicebean);©overridepublic void aftcrpropcrticssct() throws exccption eventloopgroup bossgroup = new nioeventloopgroupo ; eventloopgroup workergroup 二 new nioeventloopgroupo ; try serverbootstrap bootstrap = new serv
26、erbootstrap(); bootstrap, group(bossgroup,workergroup). channel (nioserversocketchannel. class)childhandler(newchannellnitializersocketchannel>() ©override public void initcharrnc1(sockctcharrncl charmcl) throws exception channel, pipeline() addlast (newrpcdecoder(rpcrequest. class) / 將 rpc
27、請求進行解碼(為了處理請求) addlast (newrpcencoder(rpcresponse. class) / 將 rpc 響應(yīng)進行編碼(為了返回響應(yīng)) addlast (new rpchandler (handlermap) ; / 處理 rpc 請求).option(channeloption. so_backlog, 128).childoption(channeloption. so keepalive, true);str in g array 二 serveraddress. split(/:zz);string host = array0;int port = integ
28、er, parselnt (array1);channelfuture future = bootstrap.bi nd (host, port).sync (); logger, debug ('"server startcd on port port);if (serviceregistry != null) serviceregistry. register (serveraddress) ; / 注冊丿報務(wù) 地址future, channel (). closefuture (). sync (); finally workergroup. shutdowngrace
29、fully (); bossgroup. shutdowngraccfully();以上代碼中,有兩個重要的pojo需要描述一下,它們分別是rpcrequest與 rpcresponsco使用rpcrequest封裝rpc請求,代碼如下:package com. king, zkrpc;/* rpc請求*/public class rpcrequest private string requesttd;private string classname;private string methodname;private class<?> paramctcrtpcs;private o
30、bject parameters;publ ic string £etrequesttdo return rcqucstld;public void setrequestid (string requestld) this, requestld = requestld;public string getclassnamc() return classname;public void setclassname(stri ng classname) this.classname 二 classname;public string getmethodname() return method
31、name;public void setmethodname(string methodname) this. methodname 二 methodname;public class<?> getparametertypes() return parametertypes;public void setparametcrtypcs(class<?> parametcrtypcs) this.parametertypes = parametertypes;public object getparameters() return parameters;public voi
32、d setparameters(object parameters) this. parameters = parameters;使用rpcresponse封裝rpc響應(yīng),代碼如下:package com. king, zkrpe;/* rpc響應(yīng)*/public class rpcresponse private string requestid;private throwablc error;private object result;publ ic st ring get requesttdo return rcqucstld;public void setrequestld(strin
33、g requestld) this.requestld = requestld;public throwab1e geterror() return error;public void seterror(throwable error) this, error = error;public object getresult() return result;public void setresult(object resuit) this, result 二 result;使用 rpcdecoder 提供 rpc 解碼,只需擴展 netty 的 bytetomessagedecoder 抽象類的
34、decode方法即可,代碼如下:package com. king, zkrpe;import ty. buffer. bytebuf;import ty. channel. channelhandlercontext;import ty. handler. codec. bytetomessagedecoder;import java.util. list;/* rpc解碼*/publ ic class rpcdecoder extends bytetomessagedecoder private class<?> genericclass;public rpcdecoder(c
35、lass<?> genericclass) this.genericclass = genericclass;©overridepublic void decode(channelhandlercontext ctx, bytebuf in, list<object> out) tbrows exception if (in. rcadablcbytcs () < 4) return;in. markreaderindex ();int datalength = in. readtnt(); if (datalcngth < 0) ctx. clo
36、se ();if (inreadablebytes() < datalength) in.resetreadertndex();rcturn;byte data = new bytedatalength;in.readbytes(data);object obj 二 scrializationut訂.deserialize (datei, genericclass); out. add (obj);使用 rpcencoder 提供 rpc 編碼,只需擴展 netty 的 messagetobyteencoder 抽象類的encode方法即可,代碼如下:import import impo
37、rtpackage com. king, zkrpe;io. netty. buffer. bytebuf;io. netty. channel. channelhandlercontext;ty.hemdlcr. codec. mcssagctobytcencodcr;/* rpc編碼*/public class rpcencoder extends messagetobyteencoder private class<?> gencricclass;public rpcencoder(class<?> genericclass) this.genericclass
38、二 genericclass;©overridepublic void encode(channelhandlercontext ctx, object in, bytebuf out) throws exception if (genericclass. isinstance(in) bytc data = serializationut訂 serialize(in); out. writelnt(data. length);out. writebytes(data);編寫一個serializationutil工具類,使用protostuff實現(xiàn)序列化:package com. k
39、ing, zkrpe;import com.dyuproject. protostuff. linkedbuffer;import com. dyuproject. protostuff. protostufflout訂;import com. dyuproject. protostuff. schema;import com dyuproject. protostuff, runtime. runtimeschema;import org.objenesis. objenesis;import org.objenesis. objenesisstd;import java.util .map
40、;import java.util.concurrent. concurrenthashmap;/* protostuff序列化與反序列化丄貝*/public class serializationutil private static map<class<?>, schema<?>> cachedschema = new concurrcntilashmapo ();private static objenesis objenesis = new objenesisstd(true);private serializationutil() supprcss
41、warnings(unchecked") private static <t> schema<t> getschema(class<t> cis) schema<t> schema 二(schema<t>) cachedschema. get (cis); if (schema = null) schema = runtimeschema. createfrom(cls);if (schema != null) cachedschema. put(cis, schema);return schema;©suppre
42、sswarnings("unchecked") public static <t> byte serialize(t obj) class<t> cis 二(class<t>) obj. getclass();linkcdbuffcr buffer =linkedbuffer. allocate(linkedbuffer. default_buffer_s1ze);try schema<t> schema 二 getschema(cis);return protostufftouti1. tobytearray(obj, sc
43、hema, buffer); catch (exception e) throw new lllegalstateexception(e. getmessage(), e); finally buffer, clear ();public static <t> t deserialize(byte data, class<t> cis) try t message = (t) objenesis. newtnstance(cls); schcma<t> schema 二 getschema(cis);protostuff10util. mergefrom(d
44、ata, message, schema); return message; catch (exception e) throw new t1 legalstateexception(e.getmessage(), e);以上了使用objenesis來實例化對象,它是比java反射更加強大。 注意:如需要替換其它序列化框架,只需修改serializationuti 1即可。當然, 更好的實現(xiàn)方式是提供配置項來決定使用哪種序列化方式。使用rpchandler中處理rpc請求,只需擴展netty的 simplechannellnboundhandler 抽象類即可,代碼如下:package com
45、. king, zkrpc;import ty. channel. channelfuturelistener;import ty. channel. channelhandlercontext;import ty. channel. simplechanneltnboundhandler;import net.sf. eglib. reflect. fastclass;import net. sf. eglib. reflect. fastmethod;import org. slf4j. logger;import org. slfdj. loggerfactory;import java
46、. util. map;/* rpc服務(wù)端:請求處理過程*/public class rpcllandlcr extends simplcchanncllnboundiiandlcr<rpcrcqucst>private static final logger logger 二 loggerfactory. getlogger(rpchandler. class);private final map<string, object> handlermap;public rpchandler(map<string, object> handlermap) thi
47、 s.handlermap = handlermap;©overridepublic void channelreado(final channelhandlercontext ctx, rpcrequest request) tbrows exception rpcrcsponsc rcsponsc 二 new rpcrcsponsc (); response.setrequestld(request. getrequestld(); try object result 二 handie (request); response. setresult(resuit); catch (
48、throwable t) response. seterror (t);ctx. writeandflush(response). addlistener(channelfuturelistener. close);private object hem die (rpcrcqucst request) throws throwablc string classname = requestgetclasswame ();object servicebean 二 handlermap. get(classname);class<?> serviceclass = servicebean
49、. getclass();string mcthodnamc 二 request, gctmcthodmeimc ();class<?> parametertypes = request. getparametertypes (); object parameters 二 request. getparameters();/ method method = serviceclass. getmethod(methodname, paramctcrtpcs);/ method. setaccessible(true);/ return method, invoke(servicebe
50、an, parameters);fastclass servicefastclass = fastclass. create(serviceclass); fastmcthod scrviccfastmcthod 二servicefastclass. getmethod (methodname, parametertypes);return servicefastmethod. invoke(servicebean, parameters);©overridepublic void exceptioncaught(channelhandlercontext ctx, throwabl
51、e cause)loggereitor("server caught exception,cause); ctx. close();為了避免使用java反射帶來的性能問題,我們可以使用cglib提供的反射 api,如上面用到的 fastclass 與 fastmethodo同樣使用spring配置文件來配置rpc客戶端,spring-zk-rpe-client, xml代 <?xml version二1. 0 encoding二"utf-8?><beans xmlns=/http:/www. /schema/beans
52、" xmlns:xsi=/,http:/www. w3. org/2001 /xmlschema-instance xmlns:context二http:/www. springframework. org/schcma/contcxtz, xsi: schemalocation=/zhttp:/www. springframework, org/schema/beanshttp:/www. springframework. org/schema/beans/spring-beans3.0.xsd http:/www. springframework, org/schema/cont
53、exthttp:/www. springframework. org/schcma/contcxt/spring-contcxt-3 0. xsd ><context :component-scan base-package二com. king. zkrpcz/><contcxt:property-placeholderlocation=/zclasspath:rpc-client-config. properties"/<!-配置服務(wù)發(fā)現(xiàn)組件><bean i d二servi cedi scovery"class二com. king
54、, zkrpe. serviccdiscoveryz,><constructorarg name=,registryaddressz,value=,z$ registry, address /></bean><!配置rpc代理><bean id二rpcproxy" class二com. king, zkrpe. rpcproxy,z><construetor-arg name二servicediscovery"ref=/zservicediscoveryz/></bean></bcans&g
55、t;-其屮 rpc-client-config, properties 提供了具體的配置:<!- lang: java -># zookeeper 服務(wù)器registry, address二 127. 0. 0. 1:21818第八步:實現(xiàn)服務(wù)發(fā)現(xiàn)同樣使用zookeeper實現(xiàn)服務(wù)發(fā)現(xiàn)功能,見如下代碼: package com. king, zkrpe;import org. apache, zookeeper. keeperexception; import org. apache, zookeeper. watchedevent; import org. apache, zoo
56、keeper. watcher; import org. apache, zookeeper. zookeeper;import org.slf4j. logger;import org. slf4j. loggcrfactor);import java. io. ioexception;import java. util. arraylist;import java, uti l.list;import java. util, concurrent. countdownlatch; import java.util.concurrent. threadlocalrandom;/*服務(wù)發(fā)現(xiàn):連
57、接zk,添加weitch事件*/public class servicediscovery private static final logger logger 二 loggerfactory. getlogger(servicediscovery. class);private countdownlatch latch = new countdownlatch(1);private volatile list<string> datalist 二 new arraylisto();private string registryaddrcss;public servicediscovery(string registryaddress) this.registryaddress 二 reg
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 工程建設(shè)項目招標合同樣本
- 住宅室內(nèi)設(shè)計施工合同
- 住宅建造合同模板
- 電梯設(shè)備安裝與定期檢修協(xié)議
- 上海市內(nèi)銷商品房出售合同
- 2024年個人土地交易合同模板
- 2024意外傷害賠償協(xié)議書范例
- 影視廣告制作合同
- 合伙協(xié)議與法律規(guī)定沖突時的解決途徑
- 2024年技師合同書
- 無償劃轉(zhuǎn)國有股權(quán)及資產(chǎn)的可行性論證報告(附無償劃轉(zhuǎn)協(xié)議)
- 公務(wù)車司機年度工作總結(jié) 公務(wù)用車駕駛員個人總結(jié)
- 第二版《高中物理題型筆記》上冊
- 上海市大學(xué)生安全教育(2022級)學(xué)習(xí)通課后章節(jié)答案期末考試題庫2023年
- 蘇軾生平及創(chuàng)作整理
- 柴油發(fā)電機組應(yīng)急預(yù)案
- 語文《猜猜他是誰》教案
- 繪本:讓誰先吃好呢
- 寬容待人正確交往中小學(xué)生教育主題班會
- 移動通信網(wǎng)絡(luò)運行維護管理規(guī)程
- 龍頭股戰(zhàn)法優(yōu)質(zhì)獲獎?wù)n件
評論
0/150
提交評論