elasticsearch的client包下的HeapBufferedAsyncResponseConsumer类中传入了bufferLimit,该值
org.apache.http.nio.protocol.HttpAsyncResponseConsumer 的默认实现。在堆内存中缓冲整个响应内容,这意味着缓冲区的大小等于响应的内容长度。根据可配置的参数限制可以读取的响应的大小。如果实体长于配置的缓冲区限制,则引发异常。
的大小决定了es查询数据量的大小,默认时10M,(1010241024),查询出的数据量大于10M则会报错。
urrent.ExecutionException: org.apache.http.ContentTooLongException: entity content is too long [109077969] for the configured buffer limit [104857600]
观察发现bufferLimit的值使用final修饰,最开始传入是从RequestOptions的HttpAsyncResponseConsumerFactory参数传进入的DEFAULT_BUFFER_LIMIT该值是固定的,因此使用反射替换HttpAsyncResponseConsumerFactory参数,替换新的参数设置新值即可!
代码执行位置:随意,只要让改代码执行了即可,可以通过实现ApplicationRunner在run方法中执行,也可以将其放在类中的static中进行执行
//设置es查询buffer大小RequestOptions requestOptions = RequestOptions.DEFAULT;Class<? extends RequestOptions> reqClass = Class();Field reqField = null;try {reqField = DeclaredField("httpAsyncResponseConsumerFactory");} catch (NoSuchFieldException e) {e.printStackTrace();}reqField.setAccessible(true);//去除finalField modifiersField = null;try {modifiersField = DeclaredField("modifiers");} catch (NoSuchFieldException e) {e.printStackTrace();}modifiersField.setAccessible(true);try {modifiersField.setInt(reqField, Modifiers() & ~Modifier.FINAL);} catch (IllegalAccessException e) {e.printStackTrace();}//设置默认的工厂try {reqField.set(requestOptions, new HttpAsyncResponseConsumerFactory() {@Overridepublic HttpAsyncResponseConsumer<HttpResponse> createHttpAsyncResponseConsumer() {//500MBreturn new HeapBufferedAsyncResponseConsumer(5 * 100 * 1024 * 1024);}});} catch (IllegalAccessException e) {e.printStackTrace();}
代码参考链接:发现其中第一种方法行不通哈,因为final修饰原因。
org.springframework.data.elasticsearch.UncategorizedElasticsearchException: Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of coordinating operation [coordinating_and_primary_bytes=0, replica_bytes=0, all_bytes=0, coordinating_operation_bytes=115005979, max_coordinating_and_primary_bytes=103887667]]; nested exception is ElasticsearchStatusException[Elasticsearch exception [type=es_rejected_execution_exception, reason=rejected execution of coordinating operation [coordinating_and_primary_bytes=0, replica_bytes=0, all_bytes=0, coordinating_operation_bytes=115005979, max_coordinating_and_primary_bytes=103887667]]]
将大数据量的集合数据进行分割,分割后异步执行saveAll()即可。遇到数据量为12w,拆成了3组,阈值未知。
补充一下问题3
Result window is too large, from + size must be less than or equal to: [10000] but was [78020]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level setting.
- 通过脚本设置参数的值max_result_window
2.通过elasticsearchTemplate进行putMapping操作,第二个参数传入相关设置的值
推荐使用方法2,在项目启动是进行设置校验
方法1自行百度
方法2code
@Component
public class InitEsIndex implements ApplicationRunner {@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredprivate DictEsService dictEsService;@Overridepublic void run(ApplicationArguments args) {Map<String, Object> settings = new LinkedHashMap<>();settings.put("max_result_window", 2000000000);//更新索引ute(() -> {boolean checkIndex = elasticsearchRestTemplate.indexExists(Demo.class);if (checkIndex) {elasticsearchRestTemplate.putMapping(Demo.class);} else {ateIndex(Demo.class, settings);elasticsearchRestTemplate.putMapping(Demo.class);}//teacherEsService.syncTeacherListToEs(new TeacherQueryDto());});}
}
本文发布于:2024-02-01 15:43:51,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170677343537674.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |