-
前言回调函数在编程中是一种常见的设计模式,它允许一个函数在特定的时刻或条件下调用另一个函数。在Java中,我们可以通过接口和匿名内部类实现回调函数。本文将详细介绍Java中的回调函数,并提供相关代码示例。一、回调函数的概念回调函数是一种将函数作为参数传递给另一个函数的方法。当特定事件或条件发生时,被传递的函数将被调用。这种方式可以让我们在不修改原有代码的情况下,灵活地扩展和定制功能。这种设计模式在许多编程语言中都有应用,它的主要优点是提高了代码的模块化程度和可重用性。二、Java中的回调函数实现在Java中,我们可以通过接口和实现接口的类来实现回调函数。下面是一个简单的示例:定义一个回调接口:public interface Callback { void onCallback(String message); } 这个接口定义了一个名为onCallback的方法,该方法接受一个字符串作为参数。创建一个类,该类接受回调接口作为参数,并在特定条件下调用回调方法: public class Caller { private Callback callback; public Caller(Callback callback) { this.callback = callback; } public void doSomething() { // 执行一些操作... String message = "操作完成"; callback.onCallback(message); } } 在这个类中,我们定义了一个名为doSomething的方法。这个方法在执行一些操作后,会调用回调接口的onCallback方法。实现回调接口并创建Caller对象: public class Main { public static void main(String[] args) { Callback callback = new Callback() { @Override public void onCallback(String message) { System.out.println("回调函数被调用: " + message); } }; Caller caller = new Caller(callback); caller.doSomething(); } } 在这个例子中,我们创建了一个实现了Callback接口的匿名内部类,并将其传递给Caller类的构造函数。然后,我们调用Caller类的doSomething方法。当doSomething方法执行完毕后,它会调用我们传递给它的回调函数。三、使用Lambda表达式简化回调函数从Java 8开始,我们可以使用Lambda表达式简化回调函数的实现。以下是使用Lambda表达式的示例: public class Main { public static void main(String[] args) { Callback callback = message -> System.out.println("回调函数被调用: " + message); Caller caller = new Caller(callback); caller.doSomething(); } } 通过使用Lambda表达式,我们可以更简洁地实现回调函数,提高代码的可读性。四、回调函数的应用场景回调函数在Java中有许多应用场景。例如,我们可以使用回调函数来处理异步操作。在异步编程中,我们经常需要在某个操作完成后执行一些操作,但是我们无法预知这个操作何时完成。在这种情况下,我们可以使用回调函数。一个具体的例子是我们在springboot中使用RabbitMQ时,通常需要保障生产者投递消息的可靠性,rabbitmq为我们提供了这样一种方式,即生产者确认机制,这个机制就是利用回调函数实现的,当交换机收到生产者提供的消息之后,会调用我们实现的回调函数,然后我们可以在回调函数中实现一些自己的处理逻辑,从而实现发送者的可靠性。另一个常见的应用场景是在图形用户界面(GUI)编程中。在GUI编程中,我们经常需要在用户进行某些操作(如点击按钮)时执行一些操作。我们可以将这些操作封装在回调函数中,然后在用户进行操作时调用这些回调函数。五、回调函数的注意事项5.1接口设计合理设计回调接口,确保回调函数的参数和返回值类型与实际需求匹配,从而避免出现类型错误或不一致的问题。5.2. 空指针异常在使用回调函数时,需要注意空指针异常的处理。例如,在调用回调函数之前,需要进行空值检查,以确保回调函数的实例不为空。5.3. 逻辑复杂性当回调逻辑较为复杂时,可能会导致代码难以维护和理解。因此,在设计回调函数时,应尽量保持逻辑简洁明了,避免过于复杂的嵌套和逻辑判断。5.4. 性能影响在使用回调函数时,由于涉及到多个类之间的交互,可能会引入一定的性能开销。因此,在需要高性能的场景中,应谨慎使用回调函数,以避免性能影响。
-
1.介绍Collections.sort()方法的参数为一个List集合,用于给集合进行排序。Collections.sort()内部进行了方法重载,可以只传入一个List集合参数,也可以传入一个List集合参数和一个Comparator接口对象并实现其中的compare方法2.Comparator接口下的compare方法升序排列 public static void main(String[] args) { Integer[] nums = new Integer[]{3, 7, 9, 2, 1}; Arrays.sort(nums, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o1 - o2; } }); for (Integer i : nums) { System.out.print(i + " "); // 1 2 3 7 9 } }降序排列public static void main(String[] args) { Integer[] nums = new Integer[]{3, 7, 9, 2, 1}; Arrays.sort(nums, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } }); for (Integer i : nums) { System.out.print(i + " ");9 7 3 2 1 } }所以更多时候我们是直接记住了compare(int o1, int o2)方法 return o1 - o2 是升序,return o2 - o1 是降序。为什么会这样写呢?我们不妨看一下sort(T[] a, Comparator<? super T> c)方法public static <T> void sort(T[] a, Comparator<? super T> c) { if (c == null) { sort(a); } else { if (LegacyMergeSort.userRequested) legacyMergeSort(a, c); else TimSort.sort(a, 0, a.length, c, null, 0, 0); } }可以看出他是进去了else内,不妨先进入legacyMergeSort看一下private static <T> void legacyMergeSort(T[] a, Comparator<? super T> c) { T[] aux = a.clone(); if (c==null) mergeSort(aux, a, 0, a.length, 0); else mergeSort(aux, a, 0, a.length, 0, c); }这里很明显也是进去了else内,继续看mergeSortprivate static void mergeSort(Object[] src,Object[] dest,int low, int high, int off,Comparator c) { int length = high - low; // Insertion sort on smallest arrays if (length < INSERTIONSORT_THRESHOLD) { for (int i=low; i<high; i++) for (int j=i; j>low && c.compare(dest[j-1], dest[j])>0; j--) swap(dest, j, j-1); return; } // Recursively sort halves of dest into src int destLow = low; int destHigh = high; low += off; high += off; int mid = (low + high) >>> 1; mergeSort(dest, src, low, mid, -off, c); mergeSort(dest, src, mid, high, -off, c); // If list is already sorted, just copy from src to dest. This is an // optimization that results in faster sorts for nearly ordered lists. if (c.compare(src[mid-1], src[mid]) <= 0) { System.arraycopy(src, low, dest, destLow, length); return; } // Merge sorted halves (now in src) into dest for(int i = destLow, p = low, q = mid; i < destHigh; i++) { if (q >= high || p < mid && c.compare(src[p], src[q]) <= 0) dest[i] = src[p++]; else dest[i] = src[q++]; } }这一段的代码关键就是如下部分 if (length < INSERTIONSORT_THRESHOLD) { for (int i=low; i<high; i++) for (int j=i; j>low && c.compare(dest[j-1], dest[j])>0; j--) swap(dest, j, j-1); return; } 可以看到这里面调用了compare方法,当方法的返回值大于0的时候就将数组的前一个数和后一个数做交换。以升序为例来讲解,升序的话compare方法就 return o1 - o2,那么就是 return dest[j-1] - dest[j]。当 dest[j-1] > dest[j] 时,就进行交换。当 dest[j-1] <= dest[j] 时位置不变,从而达到数组升序。降序也是一样的道理。
-
就 我想接一个文本翻译的API,给频道机器人加点功能。 但是只能用Java,调接口比较麻烦,在接API的时候发现要有一个"对应region的token" 请问这个token怎么获取的啊?看了半天文档没看明白,是要自己再调一个接口才能拿到吗? String token = "对应region的token"; HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setDoInput(true); connection.setDoOutput(true); connection.addRequestProperty("Content-Type", "application/json"); // 就这里 connection.addRequestProperty("X-Auth-Token", token);
-
重定向:1)什么是重定向? 服务器向浏览器发送一个状态码302 及一个消息头location(location 的值是一个地址),浏览器会立即向 location所指定的地址发送一个新的请求。我们把这样一种机制叫重定向。2)编程: response.sendRedirect(String url);3)需要注意的问题 在重定向之前,不能够有任何的输出;如果response 缓存当中有数据,在重定向之前,会自动清空。4)重定向的特点: a,地址任意 b,浏览器地址栏地址会变化(即变化为跳转之后的地址)。转发:1)什么是转发? 一个web组件(servlet/jsp)将未完成的处理交给另外一个web组件继续完成。转发所涉的各个web组件可以共享request和response 对象。 2)编程 step1 绑定数据到request对象上。 step2 获得转发器 RequestDispatcher rd = request.getRequestDispatcher(String url); step3 转发 rd.forward(request,response); servlet:负责业务逻辑处理(包括数据访问) 。 jsp:负责生成界面。 3)需要注意的问题: 在转发之前,response缓存的数据会被清空。
-
所谓生命周期,指的是servlet容器如何创建servlet实例、分配其资源、调用其方法、并销毁其实例的整个过程。阶段一: 实例化(就是创建servlet对象,调用构造器)在如下两种情况下会迚行对象实例化。第一种情况: 当请求到达容器时,容器查找该servlet对象是否存在,如果不存在,才会创建实例。第二种情况:容器在启动时,或者新部署了某个应用时,会检查web.xml当中,servlet是否有 load-on-starup 配置。如果有,则会创建该servlet实例。 load-on-starup 参数值越小,优先级越高(最小值为0,优先级最高)。阶段二: 初始化为servlet分配资源,调用init(ServletConfig config);方法 config 对象可以用来访问servlet的初始化参数。初始化参数是使用init-param配置的参数。init可以override。阶段三: 就绪/调用有请求到达容器,容器调用servlet对象的service()方法。HttpServlet的service()方法,会依据请求方式来调用doGet()或者doPost()方法。但是,这两个do 方法默认情况下,会抛出异常,需要子类去override。阶段四: 销毁容器依据自身的算法,将不再需要的servlet对象删除掉。在删除之前,会调用servlet对象的 destroy()方法。destroy()方法用于释放资源。在servlet的整个生命周期当中,init,destroy只会执行一次,而service 方法会执行多次。
-
【问题来源】黑龙江农信社【问题简要】开发 vxml 版本自助语音流程时,本地修改代码后,服务器疑似存在缓存问题【问题类别】vxml 版本自主语音流程开发【AICC解决方案版本】AICC 版本:AICC 8.0.71【问题现象描述】在本地开发 vxml 版本自助语音流程时,通过华为配置管理系统,将流程文件连接到本地项目。华为服务器疑似存在缓存问题。如:在原有main请求响应代码中,播放音频文件 1050.wav,本地修改代码为 busy.wav后,通过电话拨号。提示音依旧播放的是 1050.wav 内容重启华为服务器 或过了不确定时长的时间后,才能生效变成 busy.wav还有个问题,如下图:经常不定时的出现 这个错误,造成电话直接挂机麻烦帮忙定位下造成这两个问题的原因,谢谢
-
前言OOM 几乎是笔者工作中遇到的线上 bug 中最常见的,一旦平时正常的页面在线上出现页面崩溃或者服务无法调用,查看服务器日志后你很可能会看到“Caused by: java.lang.OutOfMlemoryError: Java heap space” 这样的提示,那么毫无疑问表示的是 Java 堆内存溢出了。其中又当属集合内存溢出最为常见。你是否有过把整个数据库表查出来的全字段结果直接赋值给一个 List 对象?是否把未经过过滤处理的数据赋值给 Set 对象进行去重操作?又或者是在高并发的场景下创建大量的集合对象未释放导致 JVM 无法自动回收?Java 堆内存溢出我的解决方案的核心思路有两个:一是从代码入手进行优化;二是从硬件层面对机器做合理配置。一、代码优化下面先说从代码入手怎么解决。1.1Stream 流自分页/** * 以下示例方法都在这个实现类里,包括类的继承和实现 */ @Service public class StudyServiceImpl extends ServiceImpl<StudyMapper, Study> implements StudyService{}在循环里使用 Stream 流的 skip()+limit() 来实现自分页,直至取出所有数据,不满足条件时终止循环 /** * 避免集合内存溢出方法(一) * @return */ private List<StudyVO> getList(){ ArrayList<StudyVO> resultList = new ArrayList<>(); //1、数据库取出源数据,注意只拿 id 字段,不至于溢出 List<String> idsList = this.list(new LambdaQueryWrapper<Study>() .select(Study::getId)).stream() .map(Study::getId) .collect(Collectors.toList()); //2、初始化循环 boolean loop = true; long number = 0; long perSize = 5000; while (loop){ //3、skip()+limit()组合,限制每次只取固定数量的 id List<String> ids = idsList.stream() .skip(number * perSize) .limit(perSize) .collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(ids)){ //根据第3步的 id 去拿数据库的全字段数据,这样也不至于溢出,因为一次只是 5000 条 List<StudyVO> voList = this.listByIds(ids).stream() .map(e -> e.copyProperties(StudyVO.class)) .collect(Collectors.toList()); //addAll() 方法也比较关键,快速地批量添加元素,容量是比较大的 resultList.addAll(voList); } //4、判断是否跳出循环 number++; loop = ids.size() == perSize; } return resultList; }1.2数据库分页这里是用数据库语句查询符合条件的指定条数,循环查出所有数据,不满足条件就跳出循环 /** * 避免集合内存溢出方法(二) * @param param * @return */ private List<StudyVO> getList(String param){ ArrayList<StudyVO> resultList = new ArrayList<>(); //1、构造查询条件 String id = ""; //2、初始化循环 boolean loop = true; int perSize = 5000; while (loop){ //分页,固定每次循环都查 5000 条 Page<Study> studyPage = this.page(new Page<> (NumberUtils.INTEGER_ZERO, perSize), wrapperBuilder(param, id)); if (Objects.nonNull(studyPage)){ List<Study> studyList = studyPage.getRecords(); if (CollectionUtils.isNotEmpty(studyList)){ //3、每次截取固定数量的标识,数组下标减一 id = studyList.get(perSize - NumberUtils.INTEGER_ONE).getId(); //4、判断是否跳出循环 loop = studyList.size() == perSize; //添加进返回的 VO 集合中 resultList.addAll(studyList.stream() .map(e -> e.copyProperties(StudyVO.class)) .collect(Collectors.toList())); } else { loop = false; } } } return resultList; } /** * 条件构造 * @param param * @param id * @return */ private LambdaQueryWrapper<Study> wrapperBuilder(String param, String id){ LambdaQueryWrapper<Study> wrapper = new LambdaQueryWrapper<>(); //只查部分字段,按照 id 的降序排列,形成顺序 wrapper.select(Study::getUserAvatar) .eq(Study::getOpenId, param) .orderByAsc(Study::getId); if (StringUtils.isNotBlank(id)){ //这步很关键,只查比该 id 值大的数据 wrapper.gt(Study::getId, id); } return wrapper; }1.3其它思考以上从根本上还是解决不了内存里处理大量数据的问题,取出 50w 数据放内存的风险就很大了。以下是我的其它解决思路:从业务上拆解:明确什么情况下需要后端处理这么多数据?是否可以考虑在业务流程上进行拆解?或者用其它形式的页面交互代替?数据库设计:数据一般都来源于数据库,库/表设计的时候尽量将表与表之间解耦,表字段的颗粒度放细,即多表少字段,查询时只拿需要的字段;数据放在磁盘:比如放到 MQ 里存储,然后取出的时候注意按固定数量批次取,并且注意释放资源;异步批处理:如果业务对实时性要求不高的话,可以异步批量把数据添加到文件流里,再存入到 OSS 中,按需取用;定时任务处理:询问产品经理该功能或者实现是否是结果必须的?是否一定要同步处理?可以考虑在一个时间段内进行多次操作,缓解大数据量的问题;咨询大数据团队:寻求大数据部门团队的专业支持,对于处理海量数据他们是专业的,看能不能提供一些可参考的建议。二、硬件配置核心思路:加大服务器内存,合理分配服务器的堆内存,并设置好弹性伸缩规则,当触发告警时自动伸缩扩容,保证系统的可用性。2.1云服务器配置以下是阿里云 ECS 管理控制台的编辑页面,可以对 CPU 和内存进行配置。在 ECS 实例伸缩组创建完成后,即可以根据业务规模去创建一个自定义伸缩配置,在业务量大的时候会触发自动伸缩。阿里云 ECS 管理如果是部署在私有云服务器,需要对具体的 JVM 参数进行调优的话,可能还得请团队的资深大佬、或者运维团队的老师来帮忙处理。三、文章小结本篇文章主要是记录一次线上集合内存溢出问题的处理思路,在之后的文章中我会分享一些关于真实项目中处理高并发、缓存的使用、异步/解耦等内容,敬请期待。那么今天的分享到这里就结束了,如有不足和错误,还请大家指正。或者你有其它想说的,也欢迎大家在评论区交流!转载自https://www.cnblogs.com/CodeBlogMan/p/18022444
-
在日常开发中,Date工具类使用频率相对较高,大家通常都会这样写: public static Date getData(String date) throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.parse(date); } public static Date getDataByFormat(String date, String format) throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat(format); return sdf.parse(date); } 这很简单啊,有什么争议吗? 你应该听过“时区”这个名词,大家也都知道,相同时刻不同时区的时间是不一样的。 因此在使用时间时,一定要给出时区信息。 public static void getDataByZone(String param, String format) throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat(format); // 默认时区解析时间表示 Date date = sdf.parse(param); System.out.println(date + ":" + date.getTime()); // 东京时区解析时间表示 sdf.setTimeZone(TimeZone.getTimeZone("Asia/Tokyo")); Date newYorkDate = sdf.parse(param); System.out.println(newYorkDate + ":" + newYorkDate.getTime()); } public static void main(String[] args) throws ParseException { getDataByZone("2023-11-10 10:00:00","yyyy-MM-dd HH:mm:ss"); } 对于当前的上海时区和纽约时区,转化为 UTC 时间戳是不同的时间。 对于同一个本地时间的表示,不同时区的人解析得到的 UTC 时间一定是不同的,反过来不同的本地时间可能对应同一个 UTC。 格式化后出现的时间错乱。 public static void getDataByZoneFormat(String param, String format) throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat(format); Date date = sdf.parse(param); // 默认时区格式化输出 System.out.println(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss Z]").format(date)); // 东京时区格式化输出 TimeZone.setDefault(TimeZone.getTimeZone("Asia/Tokyo")); System.out.println(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss Z]").format(date)); } public static void main(String[] args) throws ParseException { getDataByZoneFormat("2023-11-10 10:00:00","yyyy-MM-dd HH:mm:ss"); } 我当前时区的 Offset(时差)是 +8 小时,对于 +9 小时的纽约,整整差了1个小时,北京早上 10 点对应早上东京 11 点。 看看Java 8是如何解决时区问题的: Java 8 推出了新的时间日期类 ZoneId、ZoneOffset、LocalDateTime、ZonedDateTime 和 DateTimeFormatter,处理时区问题更简单清晰。 public static void getDataByZoneFormat8(String param, String format) throws ParseException { ZoneId zone = ZoneId.of("Asia/Shanghai"); ZoneId tokyoZone = ZoneId.of("Asia/Tokyo"); ZoneId timeZone = ZoneOffset.ofHours(2); // 格式化器 DateTimeFormatter dtf = DateTimeFormatter.ofPattern(format); ZonedDateTime date = ZonedDateTime.of(LocalDateTime.parse(param, dtf), zone); // withZone设置时区 DateTimeFormatter dtfz = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss Z"); System.out.println(dtfz.withZone(zone).format(date)); System.out.println(dtfz.withZone(tokyoZone).format(date)); System.out.println(dtfz.withZone(timeZone).format(date)); } public static void main(String[] args) throws ParseException { getDataByZoneFormat8("2023-11-10 10:00:00","yyyy-MM-dd HH:mm:ss"); } Asia/Shanghai对应+8,对应2023-11-10 10:00:00; Asia/Tokyo对应+9,对应2023-11-10 11:00:00; timeZone 是+2,所以对应2023-11-10 04:00:00; 在处理带时区的国际化时间问题,推荐使用jdk8的日期时间类: 通过ZoneId,定义时区; 使用ZonedDateTime保存时间; 通过withZone对DateTimeFormatter设置时区; 进行时间格式化得到本地时间; 思路比较清晰,不容易出错。 在与前端联调时,报了个错,java.lang.NumberFormatException: multiple points,起初我以为是时间格式传的不对,仔细一看,不对啊。 百度一下,才知道是高并发情况下SimpleDateFormat有线程安全的问题。 下面通过模拟高并发,把这个问题复现一下: public static void getDataByThread(String param, String format) throws InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5); SimpleDateFormat sdf = new SimpleDateFormat(format); // 模拟并发环境,开启5个并发线程 for (int i = 0; i < 5; i++) { threadPool.execute(() -> { for (int j = 0; j < 2; j++) { try { System.out.println(sdf.parse(param)); } catch (ParseException e) { System.out.println(e); } } }); } threadPool.shutdown(); threadPool.awaitTermination(1, TimeUnit.HOURS); } 果不其然,报错。还将2023年转换成2220年,我勒个乖乖。 在时间工具类里,时间格式化,我都是这样弄的啊,没问题啊,为啥这个不行?原来是因为共用了同一个SimpleDateFormat,在工具类里,一个线程一个SimpleDateFormat,当然没问题啦! 可以通过TreadLocal 局部变量,解决SimpleDateFormat的线程安全问题。 public static void getDataByThreadLocal(String time, String format) throws InterruptedException { ExecutorService threadPool = Executors.newFixedThreadPool(5); ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat(format); } }; // 模拟并发环境,开启5个并发线程 for (int i = 0; i < 5; i++) { threadPool.execute(() -> { for (int j = 0; j < 2; j++) { try { System.out.println(sdf.get().parse(time)); } catch (ParseException e) { System.out.println(e); } } }); } threadPool.shutdown(); threadPool.awaitTermination(1, TimeUnit.HOURS); } 看一下SimpleDateFormat.parse的源码: public class SimpleDateFormat extends DateFormat { @Override public Date parse(String text, ParsePosition pos){ CalendarBuilder calb = new CalendarBuilder(); Date parsedDate; try { parsedDate = calb.establish(calendar).getTime(); // If the year value is ambiguous, // then the two-digit year == the default start year if (ambiguousYear[0]) { if (parsedDate.before(defaultCenturyStart)) { parsedDate = calb.addYear(100).establish(calendar).getTime(); } } } } } class CalendarBuilder { Calendar establish(Calendar cal) { boolean weekDate = isSet(WEEK_YEAR) && field[WEEK_YEAR] > field[YEAR]; if (weekDate && !cal.isWeekDateSupported()) { // Use YEAR instead if (!isSet(YEAR)) { set(YEAR, field[MAX_FIELD + WEEK_YEAR]); } weekDate = false; } cal.clear(); // Set the fields from the min stamp to the max stamp so that // the field resolution works in the Calendar. for (int stamp = MINIMUM_USER_STAMP; stamp < nextStamp; stamp++) { for (int index = 0; index <= maxFieldIndex; index++) { if (field[index] == stamp) { cal.set(index, field[MAX_FIELD + index]); break; } } } ... } } 先new CalendarBuilder(); 通过parsedDate = calb.establish(calendar).getTime();解析时间; establish方法内先cal.clear(),再重新构建cal,整个操作没有加锁; 上面几步就会导致在高并发场景下,线程1正在操作一个Calendar,此时线程2又来了。线程1还没来得及处理 Calendar 就被线程2清空了。 因此,通过编写Date工具类,一个线程一个SimpleDateFormat,还是有一定道理的。 ———————————————— 原文链接:https://blog.csdn.net/guorui_java/article/details/135611142
-
为什么需要熔断 微服务集群中,每个应用基本都会依赖一定数量的外部服务。有可能随时都会遇到网络连接缓慢,超时,依赖服务过载,服务不可用的情况,在高并发场景下如果此时调用方不做任何处理,继续持续请求故障服务的话很容易引起整个微服务集群雪崩。比如高并发场景的用户订单服务,一般需要依赖一下服务: 商品服务 账户服务 库存服务 假如此时 账户服务 过载,订单服务持续请求账户服务只能被动的等待账户服务报错或者请求超时,进而导致订单请求被大量堆积,这些无效请求依然会占用系统资源:cpu,内存,数据连接...导致订单服务整体不可用。即使账户服务恢复了订单服务也无法自我恢复。 这时如果有一个主动保护机制应对这种场景的话订单服务至少可以保证自身的运行状态,等待账户服务恢复时订单服务也同步自我恢复,这种自我保护机制在服务治理中叫熔断机制。 熔断 熔断是调用方自我保护的机制(客观上也能保护被调用方),熔断对象是外部服务。 降级 降级是被调用方(服务提供者)的防止因自身资源不足导致过载的自我保护机制,降级对象是自身。 熔断这一词来源时我们日常生活电路里面的熔断器,当负载过高时(电流过大)保险丝会自行熔断防止电路被烧坏,很多技术都是来自生活场景的提炼。 工作原理 熔断器一般具有三个状态: 关闭:默认状态,请求能被到达目标服务,同时统计在窗口时间成功和失败次数,如果达到错误率阈值将会进入断开状态。 断开:此状态下将会直接返回错误,如果有 fallback 配置则直接调用 fallback 方法。 半断开:进行断开状态会维护一个超市时间,到达超时时间开始进入 半断开 状态,尝试允许一部门请求正常通过并统计成功数量,如果请求正常则认为此时目标服务已恢复进入 关闭 状态,否则进入 断开 状态。半断开 状态存在的目的在于实现了自我修复,同时防止正在恢复的服务再次被大量打垮。 使用较多的熔断组件: hystrix circuit breaker(不再维护) hystrix-go resilience4j(推荐) sentinel(推荐) 什么是自适应熔断 基于上面提到的熔断器原理,项目中我们要使用好熔断器通常需要准备以下参数: 错误比例阈值:达到该阈值进入 断开 状态。 断开状态超时时间:超时后进入 半断开 状态。 半断开状态允许请求数量。 窗口时间大小。 实际上可选的配置参数还有非常非常多,参考 https://resilience4j.readme.io/docs/circuitbreaker 对于经验不够丰富的开发人员而言,这些参数设置多少合适心里其实并没有底。 那么有没有一种自适应的熔断算法能让我们不关注参数,只要简单配置就能满足大部分场景? 其实是有的,google sre提供了一种自适应熔断算法来计算丢弃请求的概率: 算法参数: requests:窗口时间内的请求总数 accepts:正常请求数量 K:敏感度,K 越小越容易丢请求,一般推荐 1.5-2 之间 算法解释: 正常情况下 requests=accepts,所以概率是 0。 随着正常请求数量减少,当达到 requests == K* accepts 继续请求时,概率 P 会逐渐比 0 大开始按照概率逐渐丢弃一些请求,如果故障严重则丢包会越来越多,假如窗口时间内 accepts==0 则完全熔断。 当应用逐渐恢复正常时,accepts、requests 同时都在增加,但是 K*accepts 会比 requests 增加的更快,所以概率很快就会归 0,关闭熔断。 代码实现 接下来思考一个熔断器如何实现。 初步思路是: 无论什么熔断器都得依靠指标统计来转换状态,而统计指标一般要求是最近的一段时间内的数据(太久的数据没有参考意义也浪费空间),所以通常采用一个 滑动时间窗口 数据结构 来存储统计数据。同时熔断器的状态也需要依靠指标统计来实现可观测性,我们实现任何系统第一步需要考虑就是可观测性,不然系统就是一个黑盒。 外部服务请求结果各式各样,所以需要提供一个自定义的判断方法,判断请求是否成功。可能是 http.code 、rpc.code、body.code,熔断器需要实时收集此数据。 当外部服务被熔断时使用者往往需要自定义快速失败的逻辑,考虑提供自定义的 fallback() 功能。 下面来逐步分析 go-zero 的源码实现: core/breaker/breaker.go 熔断器接口定义 兵马未动,粮草先行,明确了需求后就可以开始规划定义接口了,接口是我们编码思维抽象的第一步也是最重要的一步。 核心定义包含两种类型的方法: Allow():需要手动回调请求结果至熔断器,相当于手动挡。 DoXXX():自动回调请求结果至熔断器,相当于自动挡,实际上 DoXXX() 类型方法最后都是调用 DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error // 自定义判定执行结果 Acceptable func(err error) bool // 手动回调 Promise interface { // Accept tells the Breaker that the call is successful. // 请求成功 Accept() // Reject tells the Breaker that the call is failed. // 请求失败 Reject(reason string) } Breaker interface { // 熔断器名称 Name() string // 熔断方法,执行请求时必须手动上报执行结果 // 适用于简单无需自定义快速失败,无需自定义判定请求结果的场景 // 相当于手动挡。。。 Allow() (Promise, error) // 熔断方法,自动上报执行结果 // 自动挡。。。 Do(req func() error) error // 熔断方法 // acceptable - 支持自定义判定执行结果 DoWithAcceptable(req func() error, acceptable Acceptable) error // 熔断方法 // fallback - 支持自定义快速失败 DoWithFallback(req func() error, fallback func(err error) error) error // 熔断方法 // fallback - 支持自定义快速失败 // acceptable - 支持自定义判定执行结果 DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error } 熔断器实现 circuitBreaker 继承 throttle,实际上这里相当于静态代理,代理模式可以在不改变原有对象的基础上增强功能,后面我们会看到 go-zero 这样做的原因是为了收集熔断器错误数据,也就是为了实现可观测性。 熔断器实现采用静态代理模式,看起来稍微有点绕脑。 // 熔断器结构体 circuitBreaker struct { name string // 实际上 circuitBreaker熔断功能都代理给 throttle来实现 throttle }// 熔断器接口 throttle interface { // 熔断方法 allow() (Promise, error) // 熔断方法 // DoXXX()方法最终都会该方法 doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error } func (cb *circuitBreaker) Allow() (Promise, error) { return cb.throttle.allow() } func (cb *circuitBreaker) Do(req func() error) error { return cb.throttle.doReq(req, nil, defaultAcceptable) } func (cb *circuitBreaker) DoWithAcceptable(req func() error, acceptable Acceptable) error { return cb.throttle.doReq(req, nil, acceptable) } func (cb *circuitBreaker) DoWithFallback(req func() error, fallback func(err error) error) error { return cb.throttle.doReq(req, fallback, defaultAcceptable) } func (cb *circuitBreaker) DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error { return cb.throttle.doReq(req, fallback, acceptable) } throttle 接口实现类: loggedThrottle 增加了为了收集错误日志的滚动窗口,目的是为了收集当请求失败时的错误日志。 // 带日志功能的熔断器 type loggedThrottle struct { // 名称 name string // 代理对象 internalThrottle // 滚动窗口,滚动收集数据,相当于环形数组 errWin *errorWindow } // 熔断方法 func (lt loggedThrottle) allow() (Promise, error) { promise, err := lt.internalThrottle.allow() return promiseWithReason{ promise: promise, errWin: lt.errWin, }, lt.logError(err) } // 熔断方法 func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error { return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool { accept := acceptable(err) if !accept { lt.errWin.add(err.Error()) } return accept })) } func (lt loggedThrottle) logError(err error) error { if err == ErrServiceUnavailable { // if circuit open, not possible to have empty error window stat.Report(fmt.Sprintf( "proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s", proc.ProcessName(), proc.Pid(), lt.name, lt.errWin)) } return err } 错误日志收集 errorWindow errorWindow 是一个环形数组,新数据不断滚动覆盖最旧的数据,通过取余实现。 // 滚动窗口 type errorWindow struct { reasons [numHistoryReasons]string index int count int lock sync.Mutex } // 添加数据 func (ew *errorWindow) add(reason string) { ew.lock.Lock() // 添加错误日志 ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason) // 更新index,为下一次写入数据做准备 // 这里用的取模实现了滚动功能 ew.index = (ew.index + 1) % numHistoryReasons // 统计数量 ew.count = mathx.MinInt(ew.count+1, numHistoryReasons) ew.lock.Unlock() } // 格式化错误日志 func (ew *errorWindow) String() string { var reasons []string ew.lock.Lock() // reverse order for i := ew.index - 1; i >= ew.index-ew.count; i-- { reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons]) } ew.lock.Unlock() return strings.Join(reasons, "\n") } 看到这里我们还没看到实际的熔断器实现,实际上真正的熔断操作被代理给了 internalThrottle 对象。 internalThrottle interface { allow() (internalPromise, error) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error } internalThrottle 接口实现 googleBreaker 结构体定义 type googleBreaker struct { // 敏感度,go-zero中默认值为1.5 k float64 // 滑动窗口,用于记录最近一段时间内的请求总数,成功总数 stat *collection.RollingWindow // 概率生成器 // 随机产生0.0-1.0之间的双精度浮点数 proba *mathx.Proba } 可以看到熔断器属性其实非常简单,数据统计采用的是滑动时间窗口来实现。 RollingWindow 滑动窗口 滑动窗口属于比较通用的数据结构,常用于最近一段时间内的行为数据统计。 它的实现非常有意思,尤其是如何模拟窗口滑动过程。 先来看滑动窗口的结构体定义: RollingWindow struct { // 互斥锁 lock sync.RWMutex // 滑动窗口数量 size int // 窗口,数据容器 win *window // 滑动窗口单元时间间隔 interval time.Duration // 游标,用于定位当前应该写入哪个bucket offset int // 汇总数据时,是否忽略当前正在写入桶的数据 // 某些场景下因为当前正在写入的桶数据并没有经过完整的窗口时间间隔 // 可能导致当前桶的统计并不准确 ignoreCurrent bool // 最后写入桶的时间 // 用于计算下一次写入数据间隔最后一次写入数据的之间 // 经过了多少个时间间隔 lastTime time.Duration } window 是数据的实际存储位置,其实就是一个数组,提供向指定 offset 添加数据与清除操作。数组里面按照 internal 时间间隔分隔成多个 bucket。 // 时间窗口 type window struct { // 桶 // 一个桶标识一个时间间隔 buckets []*Bucket // 窗口大小 size int } // 添加数据 // offset - 游标,定位写入bucket位置 // v - 行为数据 func (w *window) add(offset int, v float64) { w.buckets[offset%w.size].add(v) } // 汇总数据 // fn - 自定义的bucket统计函数 func (w *window) reduce(start, count int, fn func(b *Bucket)) { for i := 0; i < count; i++ { fn(w.buckets[(start+i)%w.size]) } } // 清理特定bucket func (w *window) resetBucket(offset int) { w.buckets[offset%w.size].reset() } // 桶 type Bucket struct { // 当前桶内值之和 Sum float64 //当前桶的add总次数 Count int64 } // 向桶添加数据 func (b *Bucket) add(v float64) { // 求和 b.Sum += v // 次数+1 b.Count++ } // 桶数据清零 func (b *Bucket) reset() { b.Sum = 0 b.Count = 0 } window 添加数据: 计算当前时间距离上次添加时间经过了多少个 时间间隔,实际上就是过期了几个 bucket。 清理过期桶的数据 更新 offset,更新 offset 的过程实际上就是在模拟窗口滑动 添加数据 // 添加数据 func (rw *RollingWindow) Add(v float64) { rw.lock.Lock() defer rw.lock.Unlock() // 获取当前写入的下标 rw.updateOffset() // 添加数据 rw.win.add(rw.offset, v) } // 计算当前距离最后写入数据经过多少个单元时间间隔 // 实际上指的就是经过多少个桶 func (rw *RollingWindow) span() int { offset := int(timex.Since(rw.lastTime) / rw.interval) if 0 <= offset && offset < rw.size { return offset } // 大于时间窗口时 返回窗口大小即可 return rw.size } // 更新当前时间的offset // 实现窗口滑动 func (rw *RollingWindow) updateOffset() { // 经过span个桶的时间 span := rw.span() // 还在同一单元时间内不需要更新 if span <= 0 { return } offset := rw.offset // 既然经过了span个桶的时间没有写入数据 // 那么这些桶内的数据就不应该继续保留了,属于过期数据清空即可 // 可以看到这里全部用的 % 取余操作,可以实现按照下标周期性写入 // 如果超出下标了那就从头开始写,确保新数据一定能够正常写入 // 类似循环数组的效果 for i := 0; i < span; i++ { rw.win.resetBucket((offset + i + 1) % rw.size) } // 更新offset rw.offset = (offset + span) % rw.size now := timex.Now() // 更新操作时间 // 这里很有意思 rw.lastTime = now - (now-rw.lastTime)%rw.interval } window 统计数据: // 归纳汇总数据 func (rw *RollingWindow) Reduce(fn func(b *Bucket)) { rw.lock.RLock() defer rw.lock.RUnlock() var diff int span := rw.span() // 当前时间截止前,未过期桶的数量 if span == 0 && rw.ignoreCurrent { diff = rw.size - 1 } else { diff = rw.size - span } if diff > 0 { // rw.offset - rw.offset+span之间的桶数据是过期的不应该计入统计 offset := (rw.offset + span + 1) % rw.size // 汇总数据 rw.win.reduce(offset, diff, fn) } } googleBreaker 判断是否应该熔断 收集滑动窗口内的统计数据 计算熔断概率 // 按照最近一段时间的请求数据计算是否熔断 func (b *googleBreaker) accept() error { // 获取最近一段时间的统计数据 accepts, total := b.history() // 计算动态熔断概率 weightedAccepts := b.k * float64(accepts) // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101 dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1)) // 概率为0,通过 if dropRatio <= 0 { return nil } // 随机产生0.0-1.0之间的随机数与上面计算出来的熔断概率相比较 // 如果随机数比熔断概率小则进行熔断 if b.proba.TrueOnProba(dropRatio) { return ErrServiceUnavailable } return nil } googleBreaker 熔断逻辑实现 熔断器对外暴露两种类型的方法 简单场景直接判断对象是否被熔断,执行请求后必须需手动上报执行结果至熔断器。 func (b *googleBreaker) allow() (internalPromise, error) 复杂场景下支持自定义快速失败,自定义判定请求是否成功的熔断方法,自动上报执行结果至熔断器。 func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error Acceptable 参数目的是自定义判断请求是否成功。 Acceptable func(err error) bool // 熔断方法 // 返回一个promise异步回调对象,可由开发者自行决定是否上报结果到熔断器 func (b *googleBreaker) allow() (internalPromise, error) { if err := b.accept(); err != nil { return nil, err } return googlePromise{ b: b, }, nil } // 熔断方法 // req - 熔断对象方法 // fallback - 自定义快速失败函数,可对熔断产生的err进行包装后返回 // acceptable - 对本次未熔断时执行请求的结果进行自定义的判定,比如可以针对http.code,rpc.code,body.code func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error { // 判定是否熔断 if err := b.accept(); err != nil { // 熔断中,如果有自定义的fallback则执行 if fallback != nil { return fallback(err) } return err } // 如果执行req()过程发生了panic,依然判定本次执行失败上报至熔断器 defer func() { if e := recover(); e != nil { b.markFailure() panic(e) } }() // 执行请求 err := req() // 判定请求成功 if acceptable(err) { b.markSuccess() } else { b.markFailure() } return err } // 上报成功 func (b *googleBreaker) markSuccess() { b.stat.Add(1) } // 上报失败 func (b *googleBreaker) markFailure() { b.stat.Add(0) } // 统计数据 func (b *googleBreaker) history() (accepts, total int64) { b.stat.Reduce(func(b *collection.Bucket) { accepts += int64(b.Sum) total += b.Count }) return } 原文链接: https://mp.weixin.qq.com/s?__biz=Mzg2ODU1MTI0OA==&mid=2247484672&idx=1&sn=43067f7af6b3c6233c15a14cb2ed7505&utm_source=tuicool&utm_medium=referral ———————————————— 原文链接:https://blog.csdn.net/m0_67645544/article/details/123738412
-
什么是微服务的熔断机制 在2017年2月1日,GitLab公司的运维人员就出现过这样的事故。当时运维人员在进行数据库维护时,通过执行rm -rf命令,删除了约300GB生产环境数据。由于数据备份失效,导致整个网站宕机数十个小时。 自2017年5月12日起,全球范围内爆发基于Windows网络共享协议进行攻击传播的蠕虫恶意代码,这是不法分子通过改造之前泄露的NSA黑客武器库中“永恒之蓝”攻击程序发起的网络攻击事件,用户只要开机上网就可被攻击。短短几个小时内,包括英国、俄罗斯、整个欧洲及国内多个高校校内网、大型企业内网和政府机构专网遭到了攻击,被勒索支付高额赎金才能解密恢复文件,对重要数据造成严重的损失。 可见信息系统的安全是一个无法忽视的问题。无论是个人还是组织,即便是最简单的系统,都需要考虑安全防护的措施。服务的熔断机制就是一种对网站进行防护的措施。 服务熔断的定义 对于“熔断”一词,大家应该都不会陌生,在中国股市,就曾经在2016年1月1日至2016年1月8日期间,实施过两次熔断机制。在微服务架构中,服务熔断本质上与股市的熔断机制并无差异,其出发点都是为了更好地控制风险。 服务熔断也称服务隔离或过载保护。在微服务应用中,服务存在一定的依赖关系,形成一定的依赖链,如果某个目标服务调用慢或者有大量超时,造成服务不可用,间接导致其他的依赖服务不可用,最严重的可能会阻塞整条依赖链,最终导致业务系统崩溃(又称雪崩效应)。此时,对该服务的调用执行熔断,对于后续请求,不再继续调用该目标服务,而是直接返回,从而可以快速释放资源。等到目标服务情况好转后,则可恢复其调用。 断路器 断路器(Circuit Breaker)本身是一个电子硬件产品,是电器中一个重要组成部分。断路器可用来分配电能,不用频繁地启动异步电动机,对电源线路及电动机等实行保护,当它们发生严重的过载或者短路及欠压等故障时能自动切断电路,其功能相当于熔断器式开关与过欠热继电器等的组合。 在微服务架构中,也存在所谓断路器或者实现断路器模式的软件构件。将受保护的服务封装在一个可以监控故障的断路器对象中,当故障达到一定门限时,断路器将跳闸,所有后继调用将不会发往受保护的服务而由断路器对象之间返回错误。对于需要更长时间解决的故障问题,由于不断重试没有太大意义了,所以就可以使用断路器模式。 路器模式 Michael Nygard在他编著的书Release lt!中推广了断路器模式。断路器模式致力于防止应用程序反复尝试执行可能失败的操作。允许它继续而不用等待故障被修复,或者在确定故障持续的时候浪费CPU周期。断路器模式还使应用程序能够检测故障是否已解决。如果问题似乎已经解决,应用程序可以尝试调用该操作。 断路器模式的目的不同于重试模式。重试模式使应用程序可以在预期成功的情况下重试操作。 断路器模式阻止应用程序执行可能失败的操作。应用程序可以通过使用重试模式及断路器模式来进行组合。然而,如果断路器指示故障不是瞬态的,则重试逻辑应该对断路器返回异常,并放弃重试尝试。 断路器充当可能失败的操作的代理。代理应监视最近发生的故障的数量,并使用此信息来决定是允许操作继续,还是立即返回异常。 代理可以作为一个状态机来实现,其状态模拟一个电气断路器的功能。 ·关闭(Closed):来自应用程序的请求被路由到操作。代理维护最近失败次数的计数,如果对操作的调用不成功,代理将增加此计数。如果在给定的时间段内最近的失败次数超过了指定的阈值,则代理被置于打开状态。此时代理启动一个超时定时器,当这个定时器超时时,代理被置于半开状态。超时定时器的目的是让系统有时间来解决导致失败的问题,然后再允许应用程序尝试再次执行操作。 ·打开(Open):来自应用程序的请求立即失败,并将异常返回给应用程序。 ·半打开 Half-Open 来自应用程序的有限数量的请求被允许通过并调用操作。如果这些请求成功,则认为先前引起故障的故障已被修复,断路器切换到关闭状态(故障计数器被重置)。如果有任何请求失败,断路器会认为故障仍然存在,因此它将恢复到打开状态,并重新启动超时定时器,以使系统有一段时间从故障中恢复。半开状态有助于防止恢复服务突然被请求淹没。当服务恢复时,它可能能够支持有限的请求量,直到恢复完成,但在进行恢复时,大量工作可能导致服务超时或再次失败。 图15-1展示的是 Microsoft Azure关于断路器状态的设计图。在该图中,关闭状态使用的故障计数器是基于时间的。它会定期自动重置。如果遇到偶尔的故障,这有助于防止断路器进入打开状态。只有在指定的时间间隔内发生指定次数的故障时,才会使断路器跳闸到断路状态的故障阈值。 半打开状态使用的计数器记录调用操作的成功尝试次数。在指定次数的连续操作调用成功后,断路器恢复到关闭状态。如果调用失败,断路器将立即进入打开状态,下一次进入半打开状态时,成功计数器将被重置。 系统恢复的方式可以通过恢复或重新启动故障组件或者修复网络连接来进行外部处理。 Spring Cloud Hystrix可以用来处理依赖隔离,实现熔断机制。其主要的类有HystrixCommand和HystrixObservableCommand等。 熔断的意义 在软件系统中,不可能百分之百保证不存在故障。为了保障整体系统的可用性和容错性,需要将服务实例部署在云或分布式系统环境中。 所以,我们必须承认服务一定是会出现故障的,只有清醒地认识到服务系统的本质,才能更好地去设计系统,来不断提高服务的可用性和容错性。 微服务的故障不可避免,这些故障可能是瞬时的,如慢的网络连接、超时,资源过度使用而暂时不可用;也可能是不容易预见的突发事件的情况下需要更长时间来纠正的故障。针对分布式服务的容错,通常的做法有两种。 ·重试机制,对于预期的短暂故障问题,通过重试模式是可以解决的。 ·断路器模式。 断路器模式所带来的好处 断路器模式提供了稳定性,同时系统从故障中恢复并最大限度地减少对性能的影响。通过快速拒绝可能失败的操作的请求,而不是等待操作超时或永不返回,可以帮助维持系统的响应时间。如果断路器每次改变状态都会产生一个事件,这个信息可以用来监测断路器所保护的系统部分的健康状况,或者在断路器跳到断路状态时提醒管理员。 断路器模式通常是可定制的,可以根据可能的故障类型进行调整。例如,可以自定义定时器的超时。您可以先将断路器置于“打开”状态几秒,然后如果故障仍未解决,则将超时增加到几分钟。 断路器模式的功能 一般来说,断路器具备如下功能。 1.异常处理 通过断路器调用操作的应用程序必须能够处理在操作不可用时可能被抛出的异常,该类异常的处理方式都是应用程序特有的。例如,应用程序会暂时降级其功能,调用备选操作尝试相同的任务或获取相同的数据,或者将异常通知给用户让其稍后重试。 一个请求可能由于各种原因失败,其中有一些可能表明故障严重类型高于其他故障。例如,一个请求可能由于需要几分钟才能恢复的远程服务崩溃而失败,也可能由于服务暂时超载造成的超时而失败。断路器有可能可以检查发生的异常类型,并根据这些异常类型来调整策略。例如,促使切换到打开状态的服务超时异常个数要远多于服务完全不可用导致的故障个数。 2.日志记录 一个断路器应记录所有失败的请求(如果可能的话记录所有请求),以使管理员能够监视它封装下受保护操作的运行状态。 3.可恢复 应该把断路器配置成与受保护操作最匹配的恢复模式。例如,如果设定断路器为打开状态的时间需要很长,即使底层操作故障已经解决,它还会返回错误。如果打开状态切换到半打开态过快,底层操作故障还没解决,它就会再次调用受保护操作。 4.测试失败的操作 在打开状态下,断路器可能不用计时器来确定何时切换到半打开状态,而是通过周期性地查验远程服务或资源以确定它是否已经再次可用。这个检查可能采用上次失败的操作的形式,也可以使用由远程服务提供的专门用于测试服务健康状况的特殊操作。 5.手动复位 在一个系统中,如果一个失败的操作的恢复时间差异很大,则提供一个手动复位选项,以使管理员能够强行关闭断路器及重置故障计数器。同样,如果受保护操作暂时不可用,管理员可以强制断路器进入打开状态并重新启动超时定时器。 6.并发 同—断路器可以被应用程序的大量并发实例访问。断路器实现不应阻塞并发请求或对每一请求增加额外开销。 7.加速断路 有时失败响应对于断路器实现来说包含足够的信息用于判定是否应当立即跳闸,并保持最小时间量的跳闸状态。例如,从过载共享资源的错误响应中可能指示了“不推荐立即重试”,那么应用程序应当隔几分钟之后再进行重试,而不应该立即重试。 如果一个请求的服务对于特定Web服务器不可用,可以返回HTTP协议定义的“HTTP 503Service Unavailable”响应。该响应可以包含额外的信息,如预期延迟持续时间。 8.重试失败请求 在打开状态下,断路器可以不仅仅是快速地简单返回失败,而是可以将每个请求的详细信息记录日志,并在远程资源或服务重新可用时安排重试。 ———————————————— 原文链接:https://blog.csdn.net/m0_63437643/article/details/123149700
-
熔断 微服务架构中,如果需要保障可用性,其中一个方式就是 熔断。熔断在微服务架构里面是指 当微服务本身出现问题的时候,它会拒绝新的请求,直到微服务恢复。通过熔断机制可以给服务端恢复的时间,比如 CPU 使用率已经超出负载了,此时服务端触发了熔断,那么新来的请求就会被拒绝,因此,服务端的 CPU 使用率就会在一段时间内降到100%以内。 判定服务的健康状态 判断微服务是否出现了问题,需要根据自己的业务来选择一些指标来代表这个服务器的健康程度,一般可以使用 响应时间、错误率。不管选择什么指标,都要考虑两个因素: 一是阈值如何选择;二是超过阈值之后,要不要持续一段时间才触发熔断。 假设 把响应时间作为指标,如果业务对响应时间的要求是在 1s 以内,那么你的阈值就可以设定在 1s,或者稍高一点,留点容错的余地也可以,原则上阈值应该明显超过正常响应时间。比如经过一段时间的观测之后,发现这个服务的 99 线是 1s,那么你可以考虑将熔断阈值设定为 1.2s。 如果响应时间一旦超过了阈值,也不能立刻就熔断,而是 要求响应时间超过一段时间之后才触发熔断。这主要是出于两个考虑,一个是响应时间可能是偶发性地突然增长;另外一个则是防止抖动。这个“一段时间”很大程度上就依赖个人经验了,如果时间过短,可能会频繁触发熔断,然后又恢复,再熔断,再恢复…… 反过来,如果时间过长,就可能会导致需要触发熔断的时候却迟迟没有触发。你可以根据经验来设定一个值,比如说三十秒或者一分钟。 总结:为了保障微服务的可用性,在核心服务里面可以接入熔断,针对不同的服务,可以设计不同的微服务熔断策略。比如最简单的熔断策略就是根据响应时间来判定,当响应时间超过阈值一段时间之后就会触发熔断;一般会根据业务情况来选择这个阈值,如果产品方要求响应时间是1s,那么可以把阈值设定在1.2s。如果响应时间超过1.2s,并且持续三十秒,就会触发熔断。在触发熔断的情况下,新请求会被拒绝,而已有的请求还是会被继续处理,直到服务恢复正常。 还可以根据缓存策略设计熔断方案,比如某一个接口得并发很高,对缓存的依赖度非常严重,所以当检测到缓存不可用的时候(比如说 Redis 崩溃了),那么就会触发熔断,因为此时如果不熔断的话,高并发的请求会因为 Redis 崩溃而全部落到 MySQL 上,导致压垮 MySQL。 在触发熔断之后,可以额外开启一个线程(例如 Goroutine)持续不断地 ping Redis的服务是否正常,如果 Redis 恢复了,那么就退出熔断状态,新来的请求就不会被拒绝了。 这种方案类似于处理缓存雪崩的问题,参考:《缓存雪崩、缓存击穿、穿透穿透具体指哪些问题?》 服务恢复正常 如果我们判断一个服务响应时间过长,进入了熔断状态,那么一段时间(比如十分钟)过后,已接收的请求已经被处理完了,也就是服务已经恢复正常了,那么就需要退出熔断状态,继续接收新请求。因此在触发熔断之后,就要考虑检测服务是否已经恢复正常。 如果本身熔断是高并发引起的,那么在一分钟后并发依旧很高,这时候你一旦直接恢复正常,然后高并发的流量打过来,服务是不是又会触发熔断? 这就会出现上面说的“抖动”的情况。 所谓 抖动:就是服务频繁地在正常-熔断两个状态之间切换。引起抖动的原因是多样的,比如说前面提到的一旦超过阈值就进入熔断状态;再比如“一分钟后就认为服务已经恢复正常,继续处理新请求”就容易引发抖动问题。 要解决这个抖动问题,就需要在恢复之后控制住流量。比如说按照 10%、20%、30%……逐步递增,而不是立刻恢复 100% 的流量(需要负载均衡来配合)。在这种逐步放开流量的措施下,依旧有请求因为熔断不会被处理,那么有没有更好的处理方式? 其实,可以让客户端来控制这个流量。就是当服务端触发熔断之后,客户端就直接不再请求这个节点了,而是换一个节点。等到恢复了之后,客户端再逐步对这个节点放开流量。整体思路是利用负载均衡来控制流量,如果一个服务端节点触发了熔断,那么客户端在做负载均衡的时候就可以将这个节点挪出可用列表,后续请求会发给别的节点。在经过一段时间之后,客户端可以尝试发请求给该节点。如果该节点正确处理了,那客户端就可以加大流量,否则客户端就要再一次等待一段时间。 综合运用负载均衡和熔断的方案,重点在于客户端控制流量,并根据服务端节点的状况来操作可用节点列表。参考:《微服务的注册发现和微服务架构下的负载均衡》 万一所有可用节点都触发熔断了,应该怎么办? 如果因为某些原因数据库出问题,导致某个服务所有的节点都触发了熔断,那么客户端就完全没有可用节点了。针对这个问题,熔断解决不了,负载均衡也解决不了,只能通过监控告警之后人手工介入处理了。 整体流程: 服务端在触发熔断的时候,会返回一个代表熔断的错误。 客户端在收到这个错误之后,就会把这个服务端节点暂时挪出可用节点列表。后续所有的新请求都不会再打到这个触发了熔断的服务端节点上了。 客户端在等待一段时间后,逐步放开流量 如果服务端正常处理了新来的请求,那么客户端就加大流量。 如果服务端再次返回了熔断响应,那么客户端就会再一次将这个节点挪出可用列表。 如此循环,直到服务端完全恢复正常,客户端也正常发送请求到该服务端节点。 降级 降级就是在服务资源不够用的时候,停用一部分边缘业务,这部分被停用的边缘业务可以被理解为“全部熔断了”。 比如在双十一之类的大促高峰,电商平台可能会关闭一些服务(比如退款服务)用来保证订单业务尽可能不受影响(当然营销策略部分不在我们的讨论范围),这就是降级的典型应用,不过它是一种手动的跨服务降级。这种降级的好处有两方面:一方面是腾出了服务器资源,可以给订单服务或者支付服务;另外一方面是减少了对公共组件的压力,比如说减少了对数据库的写入压力。 关于服务降级,主要关心的也是两个方面,其一、如何判定一个服务要不要降级(如何判定服务健康);其二、降级之后怎么恢复,也是要考虑抖动的问题。熔断是彻底不提供服务,而降级则是尽量提供不分服务。 所以在一些场景下,既可以用熔断,也可以用降级。比如说在响应时间超过阈值之后,可以考虑选择熔断,完全不提供服务;也可以考虑降级,提供一部分服务。 原则上来说,是应该优先考虑使用降级的,然而有些服务是无法降级的,尤其是写服务。例如 你从前端接收数据,然后写到数据库,这种场景是无法降级的。另外,如果你希望系统负载尽快降低,那么熔断要优于降级。 如何降级?基本上可以分成两大类:跨服务降级 和 本服务提供有损服务。 跨服务降级 当服务资源不够的时候可以暂停某些服务,将腾出来的资源给其他更加重要、更加核心的服务使用。(上面提到的大促期间暂停退款服务就是跨服务降级的例子)。这种策略的要点是,必须要确定一个服务比另外一个服务更有业务价值,或者更加重要。 跨服务降级的措施常见的做法有三个: 整个服务停掉,例如前面提到的停掉退款服务。 停掉服务的部分节点,例如十个节点,停掉其中五个节点,这五个节点被挪作他用。 停止访问某些资源。例如日志中心压力很大的时候,发信号给某些不重要的服务,让它们停止上传日志,只在本地保存日志。 跨服务降级可以在大部分合并部署的服务里面使用,一般的原则就是 B、C端合并部署降级 B 端;付费服务和非付费服务降级非付费服务。当然也可以根据自己的业务价值,将这些部署在同一个节点上的服务分成三六九等。而后在触发降级的时候从不重要的服务开始降级,将资源调配给重要服务。 自身服务提供有损服务 例如各大 App 的首页都会有降级的策略。在没有触发降级的时候,App 首页是针对你个人用户画像的个性化推荐。而在触发了降级之后,则可能是使用榜单数据,或者使用一个运营提前配置好的静态页面。这种策略的要点是你得知道你的服务调用者能够接受什么程度的有损。 针对服务本身的一些常见的降级思路: 返回默认值,这算是最简单的一种状况。 禁用可观测性组件,正常来说在业务里面都充斥了各种各样的埋点。这些埋点本身其实是会带来消耗的,所以在性能达到瓶颈的时候,就可以考虑停用,或者降低采样率。 同步转异步,即正常情况下,服务收到请求之后会立刻处理。但是在降级的情况下,服务在收到请求之后只会返回一个代表“已接收”的响应。后续服务会异步地开启线程来处理,或者依赖于定时任务来处理。 简化流程,如果你处理一个请求需要很多步骤,后续如果有一些步骤不关键的话,可以考虑不执行,或者异步执行。例如在内容生产平台,一般新内容要被推送到推荐系统里面。那么在降级的情况下你可以不推,而后可以考虑异步推送过去,也可以考虑等系统恢复之后再推送过去。 需要注意的是,在任何的故障处理里面,都要考虑恢复策略会不会引起抖动问题。 也可以考虑使用降级来保护 缓存-数据库 结构,一般来说,基本上都是先从缓存里面读数据,如果缓存里面没有数据,就从数据库中读取。那么在触发降级的情况下,可以考虑只从缓存里面读取,如果缓存里面没有数据,那么就直接返回,而不会再去数据库里读取。 这样可以保证在缓存里面有数据的那部分请求可以得到正常处理,也就是提供了有损服务。如果完全不考虑从数据库里取数据,那么你的性能瓶颈就完全取决于缓存,那么服务能够撑住的 QPS 会非常高。但是,如果缓存不命中的时候要去数据库取数据,那么服务的性能会衰退得非常快,即极少数缓存未命中的请求会占据大部分的系统资源。 具体案例分析 如果你的某个服务是同时提供了读服务和写服务,并且读服务明显比写服务更加重要,那么这时候你就可以考虑降级写服务。 假如说现在有一个针对商家的服务(比如 某团外卖),商家调用这些 API 来录入一些数据,比如他们门店的基本信息,上传一些门店图片等。同时还有一个针对 C 端普通用户的服务,这个服务就是把商家录入的数据展示在商家门店的首页上。所以你可以看到在这个场景下, 读服务 QPS 更高,也更加重要。那么如果这两个服务是一起部署的,在需要降级的时候,就可以考虑将针对商家的写服务停掉,将资源都腾出来给针对 C 端用户的读服务。从资源占用的角度分析,虽然整体来说写服务 QPS 占比很低,但是对于数据库来说,一次写请求对性能的压力要远比一次读请求大。所以暂停了写服务之后,数据库的负载能够减轻不少。 具体解决方案1( 读写服务中 降级写服务):接入一个跨服务的降级策略,当发现读服务的响应时间超过了阈值的时候,或者响应时间开始显著上升的时候,就将针对 B 端商家用户的服务临时停掉,腾出来的资源都给 C 端用户使用。对于 B 端用户来说,他们这个阶段是没有办法修改已经录入的数据的,但是这并不是一个特别大的问题。当 C 端接口的响应时间恢复正常之后,会自动恢复 B 端商家接口,商家又可以修改或者录入数据了。 在内容生产平台,作者生产内容,C 端用户查看生产的内容。那么在资源不足的情况下可以考虑停掉内容生产端的服务,只保留 C 端用户查看内容的功能。如果你的用户分成普通用户和 VIP 用户,那么你也可以考虑停掉给普通用户的服务。甚至,如果一个服务既提供给普通用户,也提供给 VIP 用户,你可以考虑将普通用户请求拒绝掉,只服务 VIP 用户。毕竟,VIP用户花钱了! 判断一个服务的业务价值最简单的方法就是判定什么业务带来了多少价值,又或者根据公司的主要营收来源确定服务的业务价值,越是能赚钱的就越重要。 具体解决方案2(快慢路径中 降级慢路径):在查询商品数据中,先查询缓存,如果缓存有数据,那么就直接返回。如果缓存没有,那么就需要去数据库查询。如果此时系统的并发非常高,那么就采取降级策略,将请求标记为降级请求。降级请求只会查询缓存,而不会查询数据库。如果缓存没有,那就直接返回错误。这样能够有效防止因为少部分请求缓存未命中而占据大量系统资源,导致系统吞吐量下降和响应时间显著升高。 这种思路其实可以在很多微服务里面应用。如果一个服务可以分成快路径和慢路径两种逻辑,那么在降级之前就可以先走快路径,再走慢路径。而触发了降级之后,就只允许走快路径。上面例子中,从缓存里加载数据就是快路径,从数据库里面加载数据就是慢路径。 慢路径还可以是发起服务调用或者复杂计算。比如说一个服务快路径是直接查询缓存,而慢路径可能是发起很多微服务调用,拿到所有响应之后一起计算,算出来一个结果并缓存起来。那么在降级的时候,可以有效提高吞吐量。不过这种吞吐量是有损的,毕竟部分请求如果没有在缓存中找到数据,那么就会直接返回失败响应。 ———————————————— 原文链接:https://blog.csdn.net/rxbook/article/details/134510266
-
服务熔断在介绍熔断机制之前,我们需要了解微服务的雪崩效应。在微服务架构中,微服务是完成一个单一的业务功能,这样做的好处是可以做到解耦,每个微服务可以独立演进。但是,一个应用可能会有多个微服务组成,微服务之间的数据交互通过远程过程调用完成。这就带来一个问题,假设微服务A调用微服务B和微服务C,微服务B和微服务C又调用其它的微服务,这就是所谓的“扇出”。如果扇出的链路上某个微服务的调用响应时间过长或者不可用,对微服务A的调用就会占用越来越多的系统资源,进而引起系统崩溃,所谓的“雪崩效应”。熔断机制是应对雪崩效应的一种微服务链路保护机制。我们在各种场景下都会接触到熔断这两个字。高压电路中,如果某个地方的电压过高,熔断器就会熔断,对电路进行保护。股票交易中,如果股票指数过高,也会采用熔断机制,暂停股票的交易。同样,在微服务架构中,熔断机制也是起着类似的作用。当扇出链路的某个微服务不可用或者响应时间太长时,会进行服务的降级,进而熔断该节点微服务的调用,快速返回错误的响应信息。当检测到该节点微服务调用响应正常后,恢复调用链路。在Spring Cloud框架里,熔断机制通过Hystrix实现。Hystrix会监控微服务间调用的状况,当失败的调用到一定阈值,缺省是5秒内20次调用失败,就会启动熔断机制。在dubbo中也可利用nio超时+失败次数做熔断。dubbo可以通过扩展Filter的方式引入Hystrix,具体代码如下:1234567891011121314151617181920package com.netease.hystrix.dubbo.rpc.filter;import com.alibaba.dubbo.common.Constants;import com.alibaba.dubbo.common.extension.Activate;import com.alibaba.dubbo.rpc.Filter;import com.alibaba.dubbo.rpc.Invocation;import com.alibaba.dubbo.rpc.Invoker;import com.alibaba.dubbo.rpc.Result;import com.alibaba.dubbo.rpc.RpcException;@Activate(group = Constants.CONSUMER)public class HystrixFilter implements Filter { @Override public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation); return command.execute(); }}DubboHystrixCommand12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061package com.netease.hystrix.dubbo.rpc.filter;import org.apache.log4j.Logger;import com.alibaba.dubbo.common.URL;import com.alibaba.dubbo.rpc.Invocation;import com.alibaba.dubbo.rpc.Invoker;import com.alibaba.dubbo.rpc.Result;import com.netflix.hystrix.HystrixCommand;import com.netflix.hystrix.HystrixCommandGroupKey;import com.netflix.hystrix.HystrixCommandKey;import com.netflix.hystrix.HystrixCommandProperties;import com.netflix.hystrix.HystrixThreadPoolProperties;public class DubboHystrixCommand extends HystrixCommand { private static Logger logger = Logger.getLogger(DubboHystrixCommand.class); private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30; private Invoker invoker; private Invocation invocation; public DubboHystrixCommand(Invoker invoker,Invocation invocation){ super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName())) .andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(), invocation.getArguments() == null ? 0 : invocation.getArguments().length))) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withCircuitBreakerRequestVolumeThreshold(20)//10秒钟内至少19此请求失败,熔断器才发挥起作用 .withCircuitBreakerSleepWindowInMilliseconds(30000)//熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试 .withCircuitBreakerErrorThresholdPercentage(50)//错误率达到50开启熔断保护 .withExecutionTimeoutEnabled(false))//使用dubbo的超时,禁用这里的超时 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));//线程池为30 this.invoker=invoker; this.invocation=invocation; } /** * 获取线程池大小 * * @param url * @return */ private static int getThreadPoolCoreSize(URL url) { if (url != null) { int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE); if (logger.isDebugEnabled()) { logger.debug("ThreadPoolCoreSize:" + size); } return size; } return DEFAULT_THREADPOOL_CORE_SIZE; } @Override protected Result run() throws Exception { return invoker.invoke(invocation); }}线程池大小可以通过dubbo参数进行控制,当前其他的参数也可以通过类似的方式进行配置代码添加好后在,resource添加加载文本|-resources|-META-INF|-dubbo|-com.alibaba.dubbo.rpc.Filter (纯文本文件,内容为:hystrix=com.netease.hystrix.dubbo.rpc.filter.HystrixFilter由于Filter定义为自动激活的,所以启动代码所有消费者都被隔离起来啦!熔段解决如下几个问题: 当所依赖的对象不稳定时,能够起到快速失败的目的 快速失败后,能够根据一定的算法动态试探所依赖对象是否恢复服务降级降级是指自己的待遇下降了,从RPC调用环节来讲,就是去访问一个本地的伪装者而不是真实的服务。 当双11活动时,把无关交易的服务统统降级,如查看蚂蚁深林,查看历史订单,商品历史评论,只显示最后100条等等。区别相同点:目的很一致,都是从可用性可靠性着想,为防止系统的整体缓慢甚至崩溃,采用的技术手段;最终表现类似,对于两者来说,最终让用户体验到的是某些功能暂时不可达或不可用;粒度一般都是服务级别,当然,业界也有不少更细粒度的做法,比如做到数据持久层(允许查询,不允许增删改);自治性要求很高,熔断模式一般都是服务基于策略的自动触发,降级虽说可人工干预,但在微服务架构下,完全靠人显然不可能,开关预置、配置中心都是必要手段;区别:触发原因不太一样,服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑;管理目标的层次不太一样,熔断其实是一个框架级的处理,每个微服务都需要(无层级之分),而降级一般需要对业务有层级之分(比如降级一般是从最外围服务开始)实现方式不太一样;服务降级具有代码侵入性(由控制器完成/或自动降级),熔断一般称为自我熔断。服务限流在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。缓存的目的是提升系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹;而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开;而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流。限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。限流算法常见的限流算法有:令牌桶、漏桶。计数器也可以进行粗暴限流实现。漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.应用级限流对于一个应用系统来说一定会有极限并发/请求数,即总有一个TPS/QPS阀值,如果超了阀值则系统就会不响应用户请求或响应的非常慢,因此我们最好进行过载保护,防止大量请求涌入击垮系统。如果你使用过Tomcat,其Connector其中一种配置有如下几个参数:acceptCount:如果Tomcat的线程都忙于响应,新来的连接会进入队列排队,如果超出排队大小,则拒绝连接;maxConnections:瞬时最大连接数,超出的会排队等待;maxThreads:Tomcat能启动用来处理请求的最大线程数,如果请求处理量一直远远大于最大线程数则可能会僵死。详细的配置请参考官方文档。另外如MySQL(如max_connections)、Redis(如tcp-backlog)都会有类似的限制连接数的配置。池化技术如果有的资源是稀缺资源(如数据库连接、线程),而且可能有多个系统都会去使用它,那么需要限制应用;可以使用池化技术来限制总资源数:连接池、线程池。比如分配给每个应用的数据库连接是100,那么本应用最多可以使用100个资源,超出了可以等待或者抛异常。限流某个接口的总并发/请求数如果接口可能会有突发访问情况,但又担心访问量太大造成崩溃,如抢购业务;这个时候就需要限制这个接口的总并发/请求数总请求数了;因为粒度比较细,可以为每个接口都设置相应的阀值。可以使用Java中的AtomicLong进行限流:12345678try {if(atomic.incrementAndGet() > 限流数) {//拒绝请求 }//处理请求} finally { atomic.decrementAndGet();}分布式限流分布式限流最关键的是要将限流服务做成原子化,而解决方案可以使使用redis+lua或者nginx+lua技术进行实现,通过这两种技术可以实现的高并发和高性能。首先我们来使用redis+lua实现时间窗内某个接口的请求数限流,实现了该功能后可以改造为限流总并发/请求数和限制总资源数。Lua本身就是一种编程语言,也可以使用它实现复杂的令牌桶或漏桶算法。有人会纠结如果应用并发量非常大那么redis或者nginx是不是能抗得住;不过这个问题要从多方面考虑:你的流量是不是真的有这么大,是不是可以通过一致性哈希将分布式限流进行分片,是不是可以当并发量太大降级为应用级限流;对策非常多,可以根据实际情况调节;像在京东使用Redis+Lua来限流抢购流量,一般流量是没有问题的。对于分布式限流目前遇到的场景是业务上的限流,而不是流量入口的限流;流量入口限流应该在接入层完成,而接入层笔者一般使用Nginx。基于Redis功能的实现限流简陋的设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并此时我们就设置键的过期时间为60秒,每一个用户对此服务接口的访问就把键值加1,在60秒内当键值增加到10的时候,就禁止访问服务接口。在某种场景中添加访问时间间隔还是很有必要的。基于令牌桶算法的实现令牌桶算法最初来源于计算机网络。在网络传输数据时,为了防止网络拥塞,需限制流出网络的流量,使流量以比较均匀的速度向外发送。令牌桶算法就实现了这个功能,可控制发送到网络上数据的数目,并允许突发数据的发送。Java实现我们可以使用Guava 的 RateLimiter 来实现基于令牌桶的流控,RateLimiter 令牌桶算法是单桶实现。RateLimiter 对简单的令牌桶算法做了一些工程上的优化,具体的实现是 SmoothBursty。需要注意的是,RateLimiter 的另一个实现SmoothWarmingUp,就不是令牌桶了,而是漏桶算法。也许是出于简单起见,RateLimiter 中的时间窗口能且仅能为 1s。SmoothBursty 有一个可以放 N 个时间窗口产生的令牌的桶,系统空闲的时候令牌就一直攒着,最好情况下可以扛 N 倍于限流值的高峰而不影响后续请求。RateLimite允许某次请求拿走超出剩余令牌数的令牌,但是下一次请求将为此付出代价,一直等到令牌亏空补上,并且桶中有足够本次请求使用的令牌为止。当某次请求不能得到所需要的令牌时,这时涉及到一个权衡,是让前一次请求干等到令牌够用才走掉呢,还是让它先走掉后面的请求等一等呢?Guava 的设计者选择的是后者,先把眼前的活干了,后面的事后面再说。合理的要求是对我的锻炼,不合理的要求是对我的磨练!原文链接:https://www.cnblogs.com/yefeng654321/articles/11939933.html
-
服务降级:系统有限的资源的合理协调概念:服务降级一般是指在服务器压力剧增的时候,根据实际业务使用情况以及流量,对一些服务和页面有策略的不处理或者用一种简单的方式进行处理,从而释放服务器资源的资源以保证核心业务的正常高效运行。原因: 服务器的资源是有限的,而请求是无限的。在用户使用即并发高峰期,会影响整体服务的性能,严重的话会导致宕机,以至于某些重要服务不可用。故高峰期为了保证核心功能服务的可用性,就需要对某些服务降级处理。可以理解为舍小保大应用场景: 多用于微服务架构中,一般当整个微服务架构整体的负载超出了预设的上限阈值(和服务器的配置性能有关系),或者即将到来的流量预计会超过预设的阈值时(比如双11、6.18等活动或者秒杀活动)服务降级是从整个系统的负荷情况出发和考虑的,对某些负荷会比较高的情况,为了预防某些功能(业务场景)出现负荷过载或者响应慢的情况,在其内部暂时舍弃对一些非核心的接口和数据的请求,而直接返回一个提前准备好的fallback(退路)错误处理信息。这样,虽然提供的是一个有损的服务,但却保证了整个系统的稳定性和可用性。需要考虑的问题:区分那些服务为核心?那些非核心降级策略(处理方式,一般指如何给用户友好的提示或者操作)自动降级还是手动降服务熔断:应对雪崩效应的链路自我保护机制。可看作降级的特殊情况概念:应对微服务雪崩效应的一种链路保护机制,类似股市、保险丝原因: 微服务之间的数据交互是通过远程调用来完成的。服务A调用服务,服务B调用服务c,某一时间链路上对服务C的调用响应时间过长或者服务C不可用,随着时间的增长,对服务C的调用也越来越多,然后服务C崩溃了,但是链路调用还在,对服务B的调用也在持续增多,然后服务B崩溃,随之A也崩溃,导致雪崩效应服务熔断是应对雪崩效应的一种微服务链路保护机制。例如在高压电路中,如果某个地方的电压过高,熔断器就会熔断,对电路进行保护。同样,在微服务架构中,熔断机制也是起着类似的作用。当调用链路的某个微服务不可用或者响应时间太长时,会进行服务熔断,不再有该节点微服务的调用,快速返回错误的响应信息。当检测到该节点微服务调用响应正常后,恢复调用链路。服务熔断的作用类似于我们家用的保险丝,当某服务出现不可用或响应超时的情况时,为了防止整个系统出现雪崩,暂时停止对该服务的调用。在Spring Cloud框架里,熔断机制通过Hystrix实现。Hystrix会监控微服务间调用的状况,当失败的调用到一定阈值,缺省是5秒内20次调用失败,就会启动熔断机制。-应用场景:微服务架构中,多个微服务相互调用出使用需要考虑问题:如何所依赖的服务对象不稳定失败之后如何快速恢复依赖对象,如何探知依赖对象是否恢复服务降级和服务熔断区别触发原因不一样,服务熔断由链路上某个服务引起的,服务降级是从整体的负载考虑管理目标层次不一样,服务熔断是一个框架层次的处理,服务降级是业务层次的处理实现方式不一样,服务熔断一般是自我熔断恢复,服务降级相当于人工控制触发原因不同 服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑;一句话:服务熔断是应对系统服务雪崩的一种保险措施,给出的一种特殊降级措施。而服务降级则是更加宽泛的概念,主要是对系统整体资源的合理分配以应对压力。服务熔断是服务降级的一种特殊情况,他是防止服务雪崩而采取的措施。系统发生异常或者延迟或者流量太大,都会触发该服务的服务熔断措施,链路熔断,返回兜底方法。这是对局部的一种保险措施。服务降级是对系统整体资源的合理分配。区分核心服务和非核心服务。对某个服务的访问延迟时间、异常等情况做出预估并给出兜底方法。这是一种全局性的考量,对系统整体负荷进行管理。限流:限制并发的请求访问量,超过阈值则拒绝;降级:服务分优先级,牺牲非核心服务(不可用),保证核心服务稳定;从整体负荷考虑;熔断:依赖的下游服务故障触发熔断,避免引发本系统崩溃;系统自动执行和恢复原文链接:https://zhuanlan.zhihu.com/p/341939685
-
一、微服务三板斧 在开发微服务系统时我们通常会面临着高并发问题的考验,为了保证服务的可用性,通常会使用降级、限流和熔断进行处理。接下来我们介绍下这三个基本的概念:服务熔断、服务降级和服务限流,为后面讲解Alibaba的Sentinel组件打下扎实的基础。 1.1 服务降级 服务降级一般是指在服务器压力剧增的时候,根据实际业务使用情况以及流量,对一些服务和页面有策略的不处理或者用一种简单的方式进行处理,从而释放服务器资源的资源以保证核心业务的正常高效运行。通常原因为服务器的资源是有限的,而请求是无限的。在用户使用即并发高峰期,会影响整体服务的性能,严重的话会导致宕机,以至于某些重要服务不可用。故高峰期为了保证核心功能服务的可用性,就需要对某些服务降级处理。可以理解为舍小保大,通常处理为不让客户端等待而是立即返回一个友好的提示。 服务降级是从整个系统的负荷情况出发和考虑的,对某些负荷会比较高的情况,为了预防某些功能(业务场景)出现负荷过载或者响应慢的情况,在其内部暂时舍弃对一些非核心的接口和数据的请求,而直接返回一个提前准备好的fallback(兜底处理)错误处理信息。这样,虽然提供的是一个有损的服务,但却保证了整个系统的稳定性和可用性。 1.2 服务熔断 在介绍熔断机制之前,我们需要了解微服务的雪崩效应。在微服务架构中,微服务是完成一个单一的业务功能,这样做的好处是可以做到解耦,每个微服务可以独立演进。但是,一个应用可能会有多个微服务组成,微服务之间的数据交互通过远程过程调用完成。这就带来一个问题,假设微服务A调用微服务B和微服务C,微服务B和微服务C又调用其它的微服务,这就是所谓的“扇出”。如果扇出的链路上某个微服务的调用响应时间过长或者不可用,对微服务A的调用就会占用越来越多的系统资源,进而引起系统崩溃,所谓的“雪崩效应”。 熔断机制是应对雪崩效应的一种微服务链路保护机制。我们在各种场景下都会接触到熔断这两个字。高压电路中,如果某个地方的电压过高,熔断器就会熔断,对电路进行保护。同样,在微服务架构中,熔断机制也是起着类似的作用。当扇出链路的某个微服务不可用或者响应时间太长时,会进行服务的降级,进而熔断该节点微服务的调用,快速返回错误的响应信息。当检测到该节点微服务调用响应正常后,恢复调用链路。 1.3 服务熔断和服务降级的区别 这里主要从三个原因分开进行比较,最后 触发原因:服务熔断由链路上某个服务引起的,也就是说,服务熔断一般是某个服务(下游服务)故障引起的,而服务降级是从整体的负载考虑。服务熔断是应对系统服务雪崩的一种保险措施,给出的一种特殊降级措施。而服务降级则是更加宽泛的概念,主要是对系统整体资源的合理分配以应对压力。 管理目标层次:服务熔断是一个框架层次的处理,服务降级是业务层次的处理。 实现方式:服务熔断一般是自我熔断恢复,调用的降级处理是在客户端进行降级处理(编写相应的兜底方法),而服务降级相当于是在服务端进行的兜底方案控制。 总的来说: 服务熔断是服务降级的一种特殊情况,是防止服务雪崩而采取的措施。系统发生异常或者延迟或者流量太大,都会触发该服务的服务熔断措施,链路熔断,返回兜底方法。这是对局部的一种保险措施。 服务降级是对系统整体资源的合理分配。区分核心服务和非核心服务。对某个服务的访问延迟时间、异常等情况做出预估并给出兜底方法。这是一种全局性的考量,对系统整体负荷进行管理。 1.3 服务限流 在上述两种方式中,最终服务的确可以使用,但是访问的却是缺省的服务,比如服务熔断和服务降级最终都会调用相应的兜底方案返回,也就是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开;而有些场景并不能用熔断和降级来解决,比如稀缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询(评论的最后几页),因此需有一种手段来限制这些场景的并发/请求量,即限流。 限流的目的是通过对并发请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或告知资源没有了)、排队或等待(比如秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。 一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。 常见的限流算法有:令牌桶、漏桶(Sentinel采用的)。计数器也可以进行简单的限流实现。 漏桶算法(Leaky Bucket)思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。示意图如下: 令牌桶算法(Token Bucket)和漏桶效果一样,但方向相反,更加容易理解。随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务。 令牌桶的另外一个好处是可以方便的改变速度,一旦需要提高速率,则按需提高放入桶中的令牌的速率。一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量。 二、总结 本文主要介绍微服务调用过程中的三个基本概念(服务降级、服务熔断、服务限流)进行介绍,为下节进行Sentinel探索打下基础。 ———————————————— 原文链接:https://blog.csdn.net/dl962454/article/details/122193938
-
1 什么是熔断降级 熔断就是“保险丝”。当出现某些状况时,切断服务,从而防止应用程序不断地尝试执 行可能会失败的操作给系统造成“雪崩”,或者大量的超时等待导致系统卡死。 2:降级的目的 降级的目的是当某个服务提供者发生故障的时候,向调用方返回一个错误响应或者替代 响应。举例子:调用联通接口服务器发送短信失败之后,改用移动短信服务器发送,如果移 动短信服务器也失败,则改用电信短信服务器,如果还失败,则返回“失败”响应;在从推 荐商品服务器加载数据的时候,如果失败,则改用从缓存中加载,如果缓存中也加载失败, 则返回一些本地替代数据。 一:Polly 简单使用 建立一个NETCore控制台项目:安装NuGet包:Install-Package Polly -Version 6.0.1 在Program.cs中添加一下代码,每一个部分分为一种方法 启动项目的方法最好用cmd命令执行也可实例调试,命令执行:dotnet PollyTest.dll(PollyTest.dll代表该项目bin目录下的程序dll); using Polly; using System; namespace PollyTest { class Program { static void Main(string[] args) { #region 异常捕获 第一种方法 //实例化 Policy policy = Policy .Handle<ArgumentException>() //故障 .Fallback(() =>//动作 { Console.WriteLine("执行出错"); }); policy.Execute(() => {//在策略中执行业务代码 //这里是可能会产生问题的业务系统代码 Console.WriteLine("开始任务"); throw new ArgumentException("Hello world!"); Console.WriteLine("完成任务"); }); #endregion #region 重载异常 第二种方法 //如果没有被Handle处理的异常,则会导致未处理异常被抛出。还可以用Fallback的其他重载获取异常信息: Policy policy = Policy .Handle<ArgumentException>() //故障 .Fallback(() =>//动作 { Console.WriteLine("执行出错"); }, ex => { Console.WriteLine(ex); }); policy.Execute(() => { Console.WriteLine("开始任务"); throw new ArgumentException("Hello world!"); Console.WriteLine("完成任务"); }); #endregion #region 返回值 第三种方法 //如果Execute中的代码是带返回值的,那么只要使用带泛型的Policy<T> 类即可: Policy<string> policy = Policy<string> .Handle<Exception>() //故障 .Fallback(() =>//动作 { Console.WriteLine("执行出错"); return "降级的值"; }); string value = policy.Execute(() => { Console.WriteLine("开始任务"); throw new Exception("Hello world!"); Console.WriteLine("完成任务"); return "正常的值"; }); #endregion Console.ReadKey(); } } } 重试处理 using Polly; using System; namespace PollyTest { class Program { static void Main(string[] args) { #region 永久重试 //RetryForever()是一直重试直到成功 //Retry()是重试最多一次; //Retry(n) 是重试最多n次; //WaitAndRetry()可以实现“如果出错等待100ms再试还不行再等150ms秒。 Policy policy = Policy .Handle<Exception>() .Retry(3); policy.Execute(() => { Console.WriteLine("开始任务"); if (DateTime.Now.Second % 10 != 0) { throw new Exception("出错"); } Console.WriteLine("完成任务"); }); #endregion Console.ReadKey(); } } } 短路保护 Circuit Breaker 出现N次连续错误,则把“熔断器”(保险丝)熔断,等待一段时间,等待这段时间内如果再Execute 则直接抛出BrokenCircuitException异常,根本不会再去尝试调用业务代码。等待时间过去之后,再 执行Execute的时候如果又错了(一次就够了),那么继续熔断一段时间,否则就恢复正常。 这样就避免一个服务已经不可用了,还是使劲的请求给系统造成更大压力。 using Polly; using System; namespace PollyTest { class Program { static void Main(string[] args) { #region 短路保护 Policy policy = Policy .Handle<Exception>() .CircuitBreaker(6, TimeSpan.FromSeconds(5));//连续出错6次之后熔断5秒(不会再去尝试执行业务代码)。 while (true) { Console.WriteLine("开始Execute"); try { policy.Execute(() => { Console.WriteLine("开始任务"); throw new Exception("出错"); Console.WriteLine("完成任务"); }); } catch (Exception ex) { Console.WriteLine("execute出错" + ex); } Thread.Sleep(500); } Console.ReadKey(); #endregion } } } 策略封装 可以把多个ISyncPolicy合并到一起执行: policy3= policy1.Wrap(policy2); 执行policy3就会把policy1、policy2封装到一起执行 policy9=Policy.Wrap(policy1, policy2, policy3, policy4, policy5);把更多一起封装。 超时处理 #region 超时处理 //下面代码实现了“出现异常则重试三次,如果还出错就FallBack” Policy policyRetry = Policy.Handle<Exception>() .Retry(3); Policy policyFallback = Policy.Handle<Exception>() .Fallback(() => { Console.WriteLine("降级"); }); //Wrap:包裹。policyRetry在里面,policyFallback裹在外面。 //如果里面出现了故障,则把故障抛出来给外面 Policy policy = policyFallback.Wrap(policyRetry); policy.Execute(() => { Console.WriteLine("开始任务"); if (DateTime.Now.Second % 10 != 0) { throw new Exception("出错"); } Console.WriteLine("完成任务"); }); #endregion 二:AOP 框架基础 要求懂的知识:AOP、Filter、反射(Attribute)。 如果直接使用 Polly,那么就会造成业务代码中混杂大量的业务无关代码。我们使用 AOP (如果不了解 AOP,请自行参考网上资料)的方式封装一个简单的框架,模仿 Spring cloud 中的 Hystrix。 需要先引入一个支持.Net Core 的 AOP,目前我发现的最好的.Net Core 下的 AOP 框架是 AspectCore(国产,动态织入),其他要不就是不支持.Net Core,要不就是不支持对异步方法进行 拦截。MVC Filter GitHub:https://github.com/dotnetcore/AspectCore-Framework 命令:Install-Package AspectCore.Core -Version 0.5.0 在该控制台项目中添加Nuget包,并且新建一个CustomInterceptorAttribute类 using AspectCore.DynamicProxy; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace PollyTest { //安装NuGet包AspectCore.Core 如果vs是2.1版本则包要选择1.2.0 vs是3.0则直接安装即可 public class CustomInterceptorAttribute : AbstractInterceptorAttribute { //每个被拦截的方法中执行 public async override Task Invoke(AspectContext context, AspectDelegate next) { try { Console.WriteLine("Before service call"); await next(context);//执行被拦截的方法 } catch (Exception) { Console.WriteLine("Service threw an exception!"); throw; } finally { Console.WriteLine("After service call"); } } } } 在该控制台项目中新建一个被代理拦截的Person类 public class Person { [CustomInterceptor] public virtual void Say(string msg) { Console.WriteLine("service calling..." + msg); } } 在该项目下的程序入口Program类中写入代码进行测试 static void Main(string[] args) { //创建代理 ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder(); using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build()) { Person p = proxyGenerator.CreateClassProxy<Person>(); p.Say("rupeng.com"); } } 三:创建简单的熔断降级框架 熔断降级初级框架实现 1:在该控制台项目中新建一个HystrixCommandAttribute类 [AttributeUsage(AttributeTargets.Method)] public class HystrixCommandAttribute : AbstractInterceptorAttribute { #region 初级框架 public HystrixCommandAttribute(string fallBackMethod) { this.FallBackMethod = fallBackMethod; } public string FallBackMethod { get; set; } public override async Task Invoke(AspectContext context, AspectDelegate next) { try { await next(context);//执行被拦截的方法 } catch (Exception ex) { //context.ServiceMethod 被拦截的方法。context.ServiceMethod.DeclaringType被拦截方法所在的类 //context.Implementation 实际执行的对象 p //context.Parameters 方法参数值 //如果执行失败,则执行 FallBackMethod var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod); //反射调用方法 反射的方法Invoke(); Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters); context.ReturnValue = fallBackResult; } } #endregion } 2:将该控制台项目中Person类中添加方法(Person类没有就新建,因为这里已经创建过了是直接在上述的基础上进行添加) public class Person { [HystrixCommand("TengXunBackAsync")] //TengXunBackAsync表示如果小米短信失败调的下一个方法名称 public virtual async Task<string> XiaoMiAsync(string name) { throw new Exception("小米短信出现异常!");//自定义抛出一个异常 测试小米失败是否会跳到腾讯短信 Console.WriteLine("小米短信" + name); return "ok"; } [HystrixCommand("HuaWeiBackAsync")] public virtual async Task<string> TengXunBackAsync(string name) { throw new Exception("腾讯短信出现异常!");//自定义抛出一个异常 测试腾讯失败是否会跳到华为短信 Console.WriteLine("腾讯短信" + name); return "ok"; } public async Task<string> HuaWeiBackAsync(string name) { //如果该方法也想在出现异常时调用另一个方法 那本方法就要加一个virtual的修饰符 Console.WriteLine("华为短信" + name); return "ok"; } } 3:在程序入口Program.cs中进行测试 //创建代理 ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder(); using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build()) { Person p = proxyGenerator.CreateClassProxy<Person>(); p.XiaoMiAsync("rupeng.com"); } 四:细化框架 在HystrixCommandAttribute类中添加细化初级框架的代码 添加NuGet包Microsoft.Extensions.Caching.Memory HystrixCommandAttribute类如下: [AttributeUsage(AttributeTargets.Method)] public class HystrixCommandAttribute : AbstractInterceptorAttribute { #region 细化初级框架 /// <summary> /// 最多重试几次,如果为0则不重试 /// </summary> public int MaxRetryTimes { get; set; } = 0; /// <summary> /// 重试间隔的毫秒数 /// </summary> public int RetryIntervalMilliseconds { get; set; } = 100; /// <summary> /// 是否启用熔断 /// </summary> public bool EnableCircuitBreaker { get; set; } = false; /// <summary> /// 熔断前出现允许错误几次 /// </summary> public int ExceptionsAllowedBeforeBreaking { get; set; } = 3; /// <summary> /// 熔断多长时间(毫秒) /// </summary> public int MillisecondsOfBreak { get; set; } = 1000; /// <summary> /// 执行超过多少毫秒则认为超时(0表示不检测超时) /// </summary> public int TimeOutMilliseconds { get; set; } = 0; /// <summary> /// 缓存多少毫秒(0表示不缓存),用“类名+方法名+所有参数ToString拼接”做缓存Key /// </summary> public int CacheTTLMilliseconds { get; set; } = 0; private IAsyncPolicy policy; //NuGet包Microsoft.Extensions.Caching.Memory private static readonly Microsoft.Extensions.Caching.Memory.IMemoryCache memoryCache = new Microsoft.Extensions.Caching.Memory.MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions()); /// <summary> ///构造函数 /// </summary> /// <param name="fallBackMethod">降级的方法名</param> public HystrixCommandAttribute(string fallBackMethod, int maxRetryTimes) { this.FallBackMethod = fallBackMethod; this.MaxRetryTimes = maxRetryTimes; } /// <summary> ///构造函数重载 /// </summary> /// <param name="fallBackMethod">降级的方法名</param> public HystrixCommandAttribute(string fallBackMethod) { this.FallBackMethod = fallBackMethod; } public string FallBackMethod { get; set; } public override async Task Invoke(AspectContext context, AspectDelegate next) { //一个HystrixCommand中保持一个policy对象即可 //其实主要是CircuitBreaker要求对于同一段代码要共享一个policy对象 //根据反射原理,同一个方法就对应一个HystrixCommandAttribute,无论几次调用, //而不同方法对应不同的HystrixCommandAttribute对象,天然的一个policy对象共享 //因为同一个方法共享一个policy,因此这个CircuitBreaker是针对所有请求的。 //Attribute也不会在运行时再去改变属性的值,共享同一个policy对象也没问题 lock (this)//因为Invoke可能是并发调用,因此要确保policy赋值的线程安全 { if (policy == null) { policy = Policy.NoOpAsync();//创建一个空的Policy //如果启用熔断 if (EnableCircuitBreaker) { //策略封装 policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak))); } //超时时间 if (TimeOutMilliseconds > 0) { //策略封装执行超时 policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic)); } //最大重试次数 if (MaxRetryTimes > 0) { //策略封装最多试几次 policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds))); } IAsyncPolicy policyFallBack = Policy .Handle<Exception>() .FallbackAsync(async (ctx, t) => { //这里拿到的就是ExecuteAsync(ctx => next(context), pollyCtx);这里传的pollyCtx AspectContext aspectContext = (AspectContext)ctx["aspectContext"]; //获取降级的方法 var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod); //调用降级的方法 Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters); //不能如下这样,因为这是闭包相关,如果这样写第二次调用Invoke的时候context指向的 //还是第一次的对象,所以要通过Polly的上下文来传递AspectContext //context.ReturnValue = fallBackResult; aspectContext.ReturnValue = fallBackResult; }, async (ex, t) => { }); policy = policyFallBack.WrapAsync(policy); } } //把本地调用的AspectContext传递给Polly,主要给FallbackAsync中使用,避免闭包的坑 Context pollyCtx = new Context();//Context是polly中通过Execute给FallBack、Execute等回调方法传上下文对象使用的 pollyCtx["aspectContext"] = context;//context是aspectCore的上下文 //Install-Package Microsoft.Extensions.Caching.Memory if (CacheTTLMilliseconds > 0) { //用类名+方法名+参数的下划线连接起来作为缓存key string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType + "." + context.ServiceMethod + string.Join("_", context.Parameters); //尝试去缓存中获取。如果找到了,则直接用缓存中的值做返回值 if (memoryCache.TryGetValue(cacheKey, out var cacheValue)) { context.ReturnValue = cacheValue; } else { //如果缓存中没有,则执行实际被拦截的方法 await policy.ExecuteAsync(ctx => next(context), pollyCtx); //存入缓存中 using (var cacheEntry = memoryCache.CreateEntry(cacheKey)) { cacheEntry.Value = context.ReturnValue;//返回值放入缓存 cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds); } } } else//如果没有启用缓存,就直接执行业务方法 { await policy.ExecuteAsync(ctx => next(context), pollyCtx); } } #endregion } 将该控制台项目中Person类中修改方法 public class Person { [HystrixCommand("TengXunBackAsync",3)] //("TengXunBackAsync",3)第一个参数表示如果小米短信失败调的下一个方法,第二个表示如果失败重试的次数 //更多参数属性的使用效果可参照该项目HystrixCommandAttribute类中属性 public virtual async Task<string> XiaoMiAsync(string name) { Console.WriteLine("小米短信"); throw new Exception("小米短信出现异常!");//如果在规定的请求数类全部失败则会进入下一个方法 本次测试时测试三次重试 Console.WriteLine("小米短信" + name); return "ok"; } [HystrixCommand("HuaWeiBackAsync",3)] public virtual async Task<string> TengXunBackAsync(string name) { Console.WriteLine("腾讯短信"); throw new Exception("腾讯短信出现异常!"); Console.WriteLine("腾讯短信" + name); return "ok"; } public async Task<string> HuaWeiBackAsync(string name) { //如果该方法也想在出现异常时调用另一个方法 那本方法就要加一个virtual的修饰符 Console.WriteLine("华为短信" + name); return "ok"; } } 最后Program.cs中进行测试 //创建代理 ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder(); using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build()) { Person p = proxyGenerator.CreateClassProxy<Person>(); p.XiaoMiAsync("rupeng.com"); } Console.ReadKey(); 文章知识点与官方知识档案匹配,可进一步学习相关知识 ———————————————— 原文链接:https://blog.csdn.net/weixin_44481764/article/details/102947204
推荐直播
-
通用人工智能(AGI)到来前夕如何实现企业降本增效和应用现代化
2024/04/19 周五 14:00-16:00
李京峰 T3出行VP/CTO
李京峰是T3出行CTO,本次他将分享通用人工智能(AGI)到来前夕,如何实现企业降本增效和应用现代化。
回顾中 -
华为云云原生FinOps解决方案,为您释放云原生最大价值
2024/04/24 周三 16:30-18:00
Roc 华为云云原生DTSE技术布道师
还在对CCE集群成本评估感到束手无策?还在担心不合理的K8s集群资源申请和过度浪费?华为云容器服务CCE全新上线云原生FinOps中心,为用户提供多维度集群成本可视化,结合智能规格推荐、混部、超卖等成本优化手段,助力客户降本增效,释放云原生最大价值。
去报名 -
产教融合专家大讲堂·第①期《高校人才培养创新模式经验分享》
2024/04/25 周四 16:00-18:00
于晓东 上海杉达学院信息科学与技术学院副院长;崔宝才 天津电子信息职业技术学院电子与通信技术系主任
本期直播将与您一起探讨高校人才培养创新模式经验。
去报名
热门标签