在使用open fegin的Hystrix情况下,将当前线程请求传到下游时,发现从RequestContextHolder中获取到的HttpServletRequest为空。
@Override
public void apply(RequestTemplate requestTemplate) {logger.info("执行Feign拦截器,处理请求头。。。");ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestAttributes();if (Null(requestAttributes)) {HttpServletRequest request = Request();Enumeration<String> headerNames = HeaderNames();while (headerNames.hasMoreElements()){String nextElement = Element();logger.info("Header[{}={}]", nextElement, Header(nextElement));requestTemplate.header(nextElement, Header(nextElement));}}
}
RequestContextHolder请求上下文持有者,我们可以在当前线程的任意位置,通过这个类来获取到当前请求的RequestAttributes,但是有个问题,其请求对象是保存在ThreadLocal中的,我们Hystrix去请求另一个服务接口是通过重开子线程的,因此我们子线程中想要获取RequestAttributes自然是获取不到的。
public abstract class RequestContextHolder {private static final boolean jsfPresent =ClassUtils.isPresent("t.FacesContext", ClassLoader());private static final ThreadLocal<RequestAttributes> requestAttributesHolder =new NamedThreadLocal<>("Request attributes");private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =new NamedInheritableThreadLocal<>("Request context");//...省略...
}
HystrixInvocationHandler处理器中可以看到,它是利用子线程来请求的。
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)throws Throwable {// early exit if the invoked method is from java.lang.Object// code is the same as ReflectiveFeign.FeignInvocationHandlerif ("equals".Name())) {try {Object otherHandler =args.length > 0 && args[0] != null ? InvocationHandler(args[0]) : null;return equals(otherHandler);} catch (IllegalArgumentException e) {return false;}} else if ("hashCode".Name())) {return hashCode();} else if ("toString".Name())) {return toString();}HystrixCommand<Object> hystrixCommand =new HystrixCommand<Object>((method)) {@Overrideprotected Object run() throws Exception {try {return HystrixInvocationHandler.(method).invoke(args);} catch (Exception e) {throw e;} catch (Throwable t) {throw (Error) t;}}@Overrideprotected Object getFallback() {if (fallbackFactory == null) {Fallback();}try {Object fallback = ate(getExecutionException());Object result = (method).invoke(fallback, args);if (isReturnsHystrixCommand(method)) {return ((HystrixCommand) result).execute();} else if (isReturnsObservable(method)) {// Create a cold Observablereturn ((Observable) result).toBlocking().first();} else if (isReturnsSingle(method)) {// Create a cold Observable as a Singlereturn ((Single) result).toObservable().toBlocking().first();} else if (isReturnsCompletable(method)) {((Completable) result).await();return null;} else if (isReturnsCompletableFuture(method)) {return ((Future) result).get();} else {return result;}} catch (IllegalAccessException e) {// shouldn't happen as method is public due to being an interfacethrow new AssertionError(e);} catch (InvocationTargetException | ExecutionException e) {// Exceptions on fallback are tossed by Hystrixthrow new Cause());} catch (InterruptedException e) {// Exceptions on fallback are tossed by HystrixThread.currentThread().interrupt();throw new Cause());}}};if (Util.isDefault(method)) {ute();} else if (isReturnsHystrixCommand(method)) {return hystrixCommand;} else if (isReturnsObservable(method)) {// Create a cold Observable();} else if (isReturnsSingle(method)) {// Create a cold Observable as a Observable().toSingle();} else if (isReturnsCompletable(method)) {Observable().toCompletable();} else if (isReturnsCompletableFuture(method)) {return new ObservableCompletableFuture<>(hystrixCommand);}ute();
}
Hystrix提供了两个隔离策略:THREAD、SEMAPHORE。其默认的策略为THREAD(线程池)。
修改隔离策略为SEMAPHORE,这个方案就不说了,毕竟官网也不推荐使用这种模式!
默认策略实现类为HystrixConcurrencyStrategyDefault,可以看看它源码,基本就是空对象。我们这里不用它这个默认的实现类,自己实现HystrixConcurrencyStrategy类,重写其#wrapCallable方法,方法内逻辑就是重开子线程之前将主线程的请求设置为可被子线程继承**(注意:继承的请求还是同一个,如果主线程和子线程存在异步情况,主线程结束后请求被销毁,子线程同样获取不到了)**。上代码:
package com.pcliu.platform.setting.openfeign;import comflix.urrency.HystrixConcurrencyStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.t.request.RequestAttributes;
import org.t.request.RequestContextHolder;import urrent.Callable;/*** @author pcliu* @version 1.0* @date 2022/6/30 17:56*/
@Component
public class RequestAttributeHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {public static final Logger logger = Logger(RequestAttributeHystrixConcurrencyStrategy.class);@Overridepublic <T> Callable<T> wrapCallable(Callable<T> callable) {RequestAttributes currentRequestAttributes = RequestContextHolder.currentRequestAttributes();RequestContextHolder.setRequestAttributes(currentRequestAttributes, Boolean.TRUE);logger.info("子线程继承父线程请求");return callable;}
}
还没有完,光实现了还不行,还得用上,很简单l添加个配置项:
hystrix:plugin:HystrixConcurrencyStrategy:implementation: com.pcliu.platform.setting.openfeign.RequestAttributeHystrixConcurrencyStrategy
OK~这里就可以了
HystrixPlugins插件在获取当前策略时,是会先加载程序员配置的策略实现类,找不到才会加载默认策略实现。
public HystrixConcurrencyStrategy getConcurrencyStrategy() {if (() == null) {// check for an implementation from Archaius firstObject impl = getPluginImplementation(HystrixConcurrencyStrategy.class);if (impl == null) {// nothing set via Archaius so initialize with defaultconcurrencyStrategypareAndSet(null, Instance());// we don't return from here but call get() again in case of thread-race so the winner will always get returned} else {// we received an implementation from Archaius so use itconcurrencyStrategypareAndSet(null, (HystrixConcurrencyStrategy) impl);}}();
}
看这个方法==Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class);,然后进入这个方法T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);,这里就是找程序员配置的策略实现类,然后加载进来:String propertyName = “hystrix.plugin.” + classSimpleName + “.implementation”;。除了可以定义策略插件,也可以定义其他各种插件。
private <T> T getPluginImplementation(Class<T> pluginClass) {T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);if (p != null) return p; return findService(pluginClass, classLoader);
}private static <T> T getPluginImplementationViaProperties(Class<T> pluginClass, HystrixDynamicProperties dynamicProperties) {String classSimpleName = SimpleName();// Check Archaius for plugin class.String propertyName = "hystrix.plugin." + classSimpleName + ".implementation";String implementingClass = String(propertyName, null).get();if (implementingClass != null) {try {Class<?> cls = Class.forName(implementingClass);// narrow the scope (cast) to the type we're expectingcls = cls.asSubclass(pluginClass);return (T) wInstance();} catch (ClassCastException e) {throw new RuntimeException(classSimpleName + " implementation is not an instance of " + classSimpleName + ": " + implementingClass);} catch (ClassNotFoundException e) {throw new RuntimeException(classSimpleName + " implementation class not found: " + implementingClass, e);} catch (InstantiationException e) {throw new RuntimeException(classSimpleName + " implementation not able to be instantiated: " + implementingClass, e);} catch (IllegalAccessException e) {throw new RuntimeException(classSimpleName + " implementation not able to be accessed: " + implementingClass, e);}} else {return null;}
}
其实分析这个方法==Object impl = getPluginImplementation(HystrixConcurrencyStrategy.class);,如果程序员没有配置插件实现类,它最后返回的是return findService(pluginClass, classLoader);,翻看代码发现,其实它还提供了SPI的方式来加载自定义插件。
private static <T> T findService(Class<T> spi, ClassLoader classLoader) throws ServiceConfigurationError {ServiceLoader<T> sl = ServiceLoader.load(spi,classLoader);for (T s : sl) {if (s != null)return s;}return null;
}
到此结束~
本文发布于:2024-02-04 04:44:53,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170699342652167.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |