访问端流量控制
博主面临的一个问题是,如何对该esb的调用者调用某个服务的次数进行控制。
也就是说:对于服务A,每个调用者调用该服务单位时间内调用次数不能超过阈值;反过来也一样,对于调用者,调用任意服务单位时间内的调用次数不能超过阈值。
那么此时,根据访问端的ip地址来标识某个调用源 origin,用服务的接口名srvName来标识调用的那个服务。
对于sentinel最基本的用法,参考:
http://blog.abreaking.com/c/分布式流量控制框架sentinel初探
加入依赖
同样的,需要加入sentinel依赖
com.alibaba.csp
sentinel-core
0.2.1-SNAPSHOT
定义规则
那么此时,规则就不止一个了。我们改变一下定义规则的方法:
public void addRule(String resource, String origin) {
List rules = FlowRuleManager.getRules();
if (rules == null) {
rules = new ArrayList();
}
FlowRule rule = new FlowRule();
rule.setResource(resource); //资源名
rule.setGrade(RuleConstant.FLOW_GRADE_QPS); //限流阈值类型,此处为qps类型
rule.setCount(8); //限流阈值,表示每秒钟通过n次请求
rule.setLimitApp(origin); //对调用端进行控制
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
如上,如果有新的规则, 直接放在FlowRuleManager中的规则List中去。限流阈值等先是写死的。
定义资源resource,调用端origin
为了模拟调用者调用那个服务这种情况,博主使用一种简单的方式来模拟:
通过url请求带参数来模拟,比如:客户端访问:http://127.0.0.1:8080/cc?srvName=hello。就表示:ip为127.0.0.1的调用端调用hello这个方法。
首先,需要定义好resource,origin。
public void addRule(String resource, String origin) {
String host = request.getRemoteHost();
String srvName = request.getParameter("srvName");
String resource = host + srvName;
String origin = host;
}
而后,就是初始化规则的问题。此时注意,并不是每次请求过来都要进行初始化,导致程序变慢不说,这一次的请求如果与上一次的请求不同源,会将上一次定义的规则给覆盖掉。
所以,定义一个List resources存放resource,只有不同的调用端第一次请求时,才会添加新的规则到规则列表中。
if (!resources.contains(resource)) {
synchronized (resources) {
if (!resources.contains(resource)) {
addRule(resource, origin);
resources.add(resource);
}
}
}
整体Demo
最后,使用包围住需要进行流量控制代码块的。
@RequestMapping
@("/cc")
public String controlClient(HttpServletRequest request) {
String host = request.getRemoteHost();
String srvName = request.getParameter("srvName");
String resource = host + srvName;
String origin = host;
if (!resources.contains(resource)) {
synchronized (resources) {
/**by liwei
* * springmvc是单例模式,这里就没必要进行双重校验
*如果是非单例的servlet,要加上双重校验,防止重复添加了规则
*/
if (!resources.contains(resource)) {
addNewRule4NewOrigin(resource, origin);
resources.add(resource);
}
}
}
String response = "";
Entry entry = null;
try {
ContextUtil.enter(resource, origin);
entry = SphU.entry(resource);
//我的代码块
response = execSrv(resource);
} catch (BlockException e1) {
//直接将异常打出来
e1.printStackTrace();
response = resource + e1.toString();
} finally {
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
}
return response;
}
测试
写个测试方法,
@Test
public void test() throws InterruptedException {
String host1 = "127.0.0.1";
String host2 = "192.168.236.1";
String srv = "hello";
for (int i = 0; i < 5; i++) {
String url = "http://" + host1 + ":8080/cc?srvName=" + srv + i;
new Thread(new MyRunnable(url)).start();
}
for (int i = 0; i < 5; i++) {
String url = "http://" + host2 + ":8080/cc?srvName=" + srv + i;
new Thread(new MyRunnable(url)).start();
}
Thread.sleep(2000);
}
class MyRunnable implements Runnable {
String url;
public MyRunnable(String url) {
this.url = url;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
String s = doPost(url);
System.out.println(s);
}
}
}
private String doPost(String url) {
HttpClient client = new DefaultHttpClient();
//发送get请求
HttpGet request = new HttpGet(url);
try {
HttpResponse response = client.execute(request);
String strResult = EntityUtils.toString(response.getEntity());
return strResult;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
发表评论