/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka.util.batcher;

import com.netflix.eureka.util.batcher.AcceptorExecutor;
import com.netflix.eureka.util.batcher.TaskHolder;
import com.netflix.eureka.util.batcher.TaskProcessor;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.StatsTimer;
import com.netflix.servo.stats.StatsConfig;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TaskExecutors<ID, T> {
    private static final Logger logger = LoggerFactory.getLogger(TaskExecutors.class);
    private static final Map<String, TaskExecutorMetrics> registeredMonitors = new HashMap<String, TaskExecutorMetrics>();
    private final AtomicBoolean isShutdown;
    private final List<Thread> workerThreads;

    TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
        this.isShutdown = isShutdown;
        this.workerThreads = new ArrayList<Thread>();
        ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
        for (int i2 = 0; i2 < workerCount; ++i2) {
            WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i2);
            Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
            this.workerThreads.add(workerThread);
            workerThread.setDaemon(true);
            workerThread.start();
        }
    }

    void shutdown() {
        if (this.isShutdown.compareAndSet(false, true)) {
            for (Thread workerThread : this.workerThreads) {
                workerThread.interrupt();
            }
            registeredMonitors.forEach(Monitors::unregisterObject);
        }
    }

    static <ID, T> TaskExecutors<ID, T> singleItemExecutors(String name, int workerCount, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
        AtomicBoolean isShutdown = new AtomicBoolean();
        TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
        registeredMonitors.put(name, metrics);
        return new TaskExecutors(idx -> new SingleTaskWorkerRunnable("TaskNonBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
    }

    static <ID, T> TaskExecutors<ID, T> batchExecutors(String name, int workerCount, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
        AtomicBoolean isShutdown = new AtomicBoolean();
        TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
        registeredMonitors.put(name, metrics);
        return new TaskExecutors(idx -> new BatchWorkerRunnable("TaskBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
    }

    static class SingleTaskWorkerRunnable<ID, T>
    extends WorkerRunnable<ID, T> {
        SingleTaskWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
            super(workerName, isShutdown, metrics, processor, acceptorExecutor);
        }

        @Override
        public void run() {
            try {
                while (!this.isShutdown.get()) {
                    TaskHolder taskHolder;
                    BlockingQueue workQueue = this.taskDispatcher.requestWorkItem();
                    while ((taskHolder = workQueue.poll(1L, TimeUnit.SECONDS)) == null) {
                        if (!this.isShutdown.get()) continue;
                        return;
                    }
                    this.metrics.registerExpiryTime(taskHolder);
                    if (taskHolder == null) continue;
                    TaskProcessor.ProcessingResult result = this.processor.process(taskHolder.getTask());
                    switch (result) {
                        case Success: {
                            break;
                        }
                        case TransientError: 
                        case Congestion: {
                            this.taskDispatcher.reprocess(taskHolder, result);
                            break;
                        }
                        case PermanentError: {
                            logger.warn("Discarding a task of {} due to permanent error", (Object)this.workerName);
                        }
                    }
                    this.metrics.registerTaskResult(result, 1);
                }
            }
            catch (InterruptedException workQueue) {
            }
            catch (Throwable e) {
                logger.warn("Discovery WorkerThread error", e);
            }
        }
    }

    static class BatchWorkerRunnable<ID, T>
    extends WorkerRunnable<ID, T> {
        BatchWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
            super(workerName, isShutdown, metrics, processor, acceptorExecutor);
        }

        @Override
        public void run() {
            try {
                while (!this.isShutdown.get()) {
                    List<TaskHolder<ID, T>> holders = this.getWork();
                    this.metrics.registerExpiryTimes(holders);
                    List<T> tasks = this.getTasksOf(holders);
                    TaskProcessor.ProcessingResult result = this.processor.process(tasks);
                    switch (result) {
                        case Success: {
                            break;
                        }
                        case TransientError: 
                        case Congestion: {
                            this.taskDispatcher.reprocess(holders, result);
                            break;
                        }
                        case PermanentError: {
                            logger.warn("Discarding {} tasks of {} due to permanent error", (Object)holders.size(), (Object)this.workerName);
                        }
                    }
                    this.metrics.registerTaskResult(result, tasks.size());
                }
            }
            catch (InterruptedException holders) {
            }
            catch (Throwable e) {
                logger.warn("Discovery WorkerThread error", e);
            }
        }

        private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
            ArrayList result;
            BlockingQueue workQueue = this.taskDispatcher.requestWorkItems();
            do {
                result = workQueue.poll(1L, TimeUnit.SECONDS);
            } while (!this.isShutdown.get() && result == null);
            return result == null ? new ArrayList() : result;
        }

        private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
            ArrayList<T> tasks = new ArrayList<T>(holders.size());
            for (TaskHolder<ID, T> holder : holders) {
                tasks.add(holder.getTask());
            }
            return tasks;
        }
    }

    static abstract class WorkerRunnable<ID, T>
    implements Runnable {
        final String workerName;
        final AtomicBoolean isShutdown;
        final TaskExecutorMetrics metrics;
        final TaskProcessor<T> processor;
        final AcceptorExecutor<ID, T> taskDispatcher;

        WorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> taskDispatcher) {
            this.workerName = workerName;
            this.isShutdown = isShutdown;
            this.metrics = metrics;
            this.processor = processor;
            this.taskDispatcher = taskDispatcher;
        }

        String getWorkerName() {
            return this.workerName;
        }
    }

    static interface WorkerRunnableFactory<ID, T> {
        public WorkerRunnable<ID, T> create(int var1);
    }

    static class TaskExecutorMetrics {
        @Monitor(name="eurekaServer.replication.numberOfSuccessfulExecutions", description="Number of successful task executions", type=DataSourceType.COUNTER)
        volatile long numberOfSuccessfulExecutions;
        @Monitor(name="eurekaServer.replication.numberOfTransientErrors", description="Number of transient task execution errors", type=DataSourceType.COUNTER)
        volatile long numberOfTransientError;
        @Monitor(name="eurekaServer.replication.numberOfPermanentErrors", description="Number of permanent task execution errors", type=DataSourceType.COUNTER)
        volatile long numberOfPermanentError;
        @Monitor(name="eurekaServer.replication.numberOfCongestionIssues", description="Number of congestion issues during task execution", type=DataSourceType.COUNTER)
        volatile long numberOfCongestionIssues;
        final StatsTimer taskWaitingTimeForProcessing;

        TaskExecutorMetrics(String id) {
            double[] percentiles = new double[]{50.0, 95.0, 99.0, 99.5};
            StatsConfig statsConfig = new StatsConfig.Builder().withSampleSize(1000).withPercentiles(percentiles).withPublishStdDev(true).build();
            MonitorConfig config = MonitorConfig.builder("eurekaServer.replication.executionTime").build();
            this.taskWaitingTimeForProcessing = new StatsTimer(config, statsConfig);
            try {
                Monitors.registerObject(id, this);
            }
            catch (Throwable e) {
                logger.warn("Cannot register servo monitor for this object", e);
            }
        }

        void registerTaskResult(TaskProcessor.ProcessingResult result, int count) {
            switch (result) {
                case Success: {
                    this.numberOfSuccessfulExecutions += (long)count;
                    break;
                }
                case TransientError: {
                    this.numberOfTransientError += (long)count;
                    break;
                }
                case PermanentError: {
                    this.numberOfPermanentError += (long)count;
                    break;
                }
                case Congestion: {
                    this.numberOfCongestionIssues += (long)count;
                }
            }
        }

        <ID, T> void registerExpiryTime(TaskHolder<ID, T> holder) {
            this.taskWaitingTimeForProcessing.record(System.currentTimeMillis() - holder.getSubmitTimestamp(), TimeUnit.MILLISECONDS);
        }

        <ID, T> void registerExpiryTimes(List<TaskHolder<ID, T>> holders) {
            long now = System.currentTimeMillis();
            for (TaskHolder<ID, T> holder : holders) {
                this.taskWaitingTimeForProcessing.record(now - holder.getSubmitTimestamp(), TimeUnit.MILLISECONDS);
            }
        }
    }
}

