수영장 디자인 아이디어
수영장 디자인은 새로운 용어가 되어서는 안 됩니다. Java 스레드 풀, JDBC 연결 풀, Redis 연결 풀 등과 같은 일반적인 것들은 이러한 유형의 디자인을 대표하는 구현입니다.
이 디자인은 초기에 리소스를 사전 설정하고 해결된 문제는 스레드 생성 비용, 원격 연결 획득 비용 등 각 리소스 획득에 대한 소비를 상쇄하는 것입니다. 매점에 음식을 사러 갈 때처럼, 음식을 준비하는 이모가 먼저 거기에 밥 몇 개를 넣어줄 것입니다. 올 때 도시락을 들고 야채를 추가할 필요가 없습니다. 밥과 야채를 동시에 준비하면 더 효율적입니다.
풀링 설계에는 리소스 초기화 외에도 풀의 초기 값, 풀의 활성 값, 풀의 최대 값 등의 기능도 포함됩니다. 이러한 기능은 멤버 속성에 직접 매핑될 수 있습니다. Java 스레드 풀 및 데이터베이스 연결 풀.
거부 정책을 실행하는 스레드 풀의 타이밍
은 데이터 소스 연결 풀과 다릅니다. 스레드 풀에는 풀의 초기 크기 및 최대값 외에 버퍼링을 위한 추가 차단 대기열도 있습니다. .
데이터 소스 연결 풀은 일반적으로 요청된 연결 수가 연결 풀의 최대 값을 초과할 때 거부 정책을 실행합니다. 정책은 일반적으로 설정된 시간 동안 차단하고 기다리거나 직접 예외를 발생시키는 것입니다.
그림과 같이 스레드 풀이 언제 거부를 트리거하는지 알고 싶다면 위 세 매개변수의 구체적인 의미를 명확히 해야 합니다. 오히려 이 세 매개변수의 전반적인 조정의 결과입니다. 단순히 최대 스레드 수를 초과했을 때 트리거하는 것보다 스레드 거부는 대략적입니다. 제출된 작업 수가 corePoolSize보다 큰 경우에만 버퍼가 채워진 후에만 큐 버퍼에 배치됩니다. 현재 실행 중인 작업이 maxPoolSize보다 큽니다. maxPoolSize보다 작으면 처리를 위해 새 스레드가 생성됩니다. 값보다 크면 거부 정책이 트리거됩니다.
요약은 다음과 같습니다. 현재 제출된 작업 수가 (maxPoolSize + queueCapacity)보다 크면 스레드 풀의 거부 정책이 트리거됩니다.
JDK에는 4가지 내장 스레드 풀 거부 전략이 있습니다
거부 전략 인터페이스 정의
JDK와 함께 제공되는 스레드 풀 거부 전략을 분석하기 전에 먼저 JDK에서 정의한 거부 전략 인터페이스를 살펴보세요.
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
인터페이스 정의는 매우 명확합니다. 거부 정책이 트리거되면 스레드 풀은 사용자가 설정한 특정 정책을 호출하고 처리를 위해 현재 제출된 작업과 스레드 풀 인스턴스 자체를 사용자에게 전달합니다. 다음은 JDK에 내장된 구현을 살펴보는 것입니다.
CallerRunsPolicy(호출자 실행 정책)
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
Function: 거부 정책이 트리거되는 경우 스레드 풀이 닫히지 않으면 작업을 제출한 현재 스레드에 의해 처리됩니다.
사용 시나리오: 일반적으로 실패가 허용되지 않고 성능 요구 사항이 높지 않으며 동시성이 작은 시나리오에서 사용됩니다. 이는 스레드 풀이 정상적인 상황에서 닫히지 않기 때문입니다. 즉, 제출된 작업이 확실히 실행됩니다. , 그러나 호출 스레드 자체에 의해 실행되기 때문에 작업이 여러 번 제출되면 후속 작업의 실행이 차단되고 성능과 효율성이 자연스럽게 느려집니다.
AbortPolicy(중단 정책)
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
기능: 거부 정책이 트리거되면 실행을 거부하는 예외가 직접 발생합니다. 중단 정책의 의미는 현재 실행 프로세스를 중단하는 것입니다
사용 시나리오. : 이것입니다. 특별한 시나리오는 없지만, 한 가지 중요한 것은 발생한 예외를 올바르게 처리하는 것입니다.
ThreadPoolExecutor의 기본 정책은 AbortPolicy입니다. ExecutorService 인터페이스의 일련의 ThreadPoolExecutor에는 명시적인 거부 정책이 없으므로 기본 정책은 다음과 같습니다.
하지만 ExecutorService의 스레드 풀 인스턴스 큐는 제한이 없으므로 메모리가 가득 차도 거부 정책이 트리거되지 않습니다. 스레드 풀 인스턴스를 사용자 정의하는 경우 이 전략을 사용할 때 전략이 트리거될 때 발생하는 예외를 처리해야 합니다. 이는 현재 실행 프로세스를 중단시키기 때문입니다.
DiscardPolicy(정책 폐기)
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
기능: 작업을 트리거하지 않고 이 작업을 직접 자동으로 취소합니다.
사용 시나리오: 제출한 작업이 중요하지 않은 경우 사용할 수 있습니다. 왜냐하면 이는 빈 구현이고 작업을 자동으로 삼켜버릴 것이기 때문입니다. 따라서 이 전략은 기본적으로 더 이상 필요하지 않습니다
DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
기능:스레드 풀이 닫히지 않은 경우 대기열의 헤드에 요소를 팝업한 다음 실행을 시도하세요
사용 시나리오: 이 정책 작업은 계속 폐기되며 폐기 시 소리가 나지 않지만, 오래되고 실행되지 않은 작업은 폐기되며 실행 우선순위가 더 높은 작업인 것이 특징입니다.
이 기능을 바탕으로 제가 생각할 수 있는 시나리오는 메시지를 게시하고 메시지를 수정하는 것입니다. 이때 메시지가 아직 실행되지 않았습니다. 실행되지 않은 메시지의 버전은 현재 제출된 메시지보다 이전 버전입니다. 메시지 버전이 낮으면 삭제할 수 있습니다. 실행을 위해 대기할 큐에 더 낮은 버전의 메시지가 있을 수 있으므로 실제로 메시지를 처리할 때 메시지 버전을 비교해야 합니다.
제3자에 의한 거부 전략
dubbo中的线程拒绝策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); private final String threadName; private final URL url; private static volatile long lastPrintTime = 0; private static Semaphore guard = new Semaphore(1); public AbortPolicyWithReport(String threadName, URL url) { this.threadName = threadName; this.url = url; } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); throw new RejectedExecutionException(msg); } private void dumpJStack() { //省略实现 } }
可以看到,当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因。
1)输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在
2)输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草。
3)继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性
Netty中的线程池拒绝策略
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler { NewThreadRunsPolicy() { super(); } public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { final Thread t = new Thread(r, "Temporary task executor"); t.start(); } catch (Throwable e) { throw new RejectedExecutionException( "Failed to start a new thread", e); } } }
Netty中的实现很像JDK中的CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。
所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常。
activeMq中的线程池拒绝策略
new RejectedExecutionHandler() { @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { try { executor.getQueue().offer(r, 60, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker"); } throw new RejectedExecutionException("Timed Out while attempting to enqueue Task."); } });
activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常
pinpoint中的线程池拒绝策略
public class RejectedExecutionHandlerChain implements RejectedExecutionHandler { private final RejectedExecutionHandler[] handlerChain; public static RejectedExecutionHandler build(List<rejectedexecutionhandler> chain) { Objects.requireNonNull(chain, "handlerChain must not be null"); RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]); return new RejectedExecutionHandlerChain(handlerChain); } private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) { this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null"); } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) { rejectedExecutionHandler.rejectedExecution(r, executor); } } }</rejectedexecutionhandler>
pinpoint的拒绝策略实现很有特点,和其他的实现都不同。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍。
위 내용은 Java 스레드 풀의 거부 정책은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!