Есть ли ExecutorService, который использует текущий поток?

94

Мне нужен совместимый способ настроить использование пула потоков или нет. В идеале остальная часть кода вообще не должна подвергаться влиянию. Я мог бы использовать пул потоков с одним потоком, но это не совсем то, что я хочу. Любые идеи?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);

// es.execute / es.submit / new ExecutorCompletionService(es) etc
Майкл Резерферд
источник

Ответы:

70

Вот действительно простая Executor(не ExecutorServiceзаметьте) реализация, которая использует только текущий поток. Похищение этого из "Java Concurrency in Practice" (важное чтение).

public class CurrentThreadExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}

ExecutorService - это более сложный интерфейс, но с ним можно справиться таким же подходом.

слишком много думать
источник
4
+1: Как вы говорите, ExecutorService может обрабатываться таким же образом, возможно, путем создания подкласса AbstractExecutorService.
Пол Кейджер 05
@Paul Ага, AbstractExecutorServiceпохоже, кстати.
overthink
15
В Java8 вы можете сократить это до простогоRunnable::run
Джон Фридман
@Juude он всегда будет запускаться в потоке, вызывающем исполнителя.
Густав Карлссон
Разве не суть исполнителя в том же потоке, чтобы иметь возможность планировать больше задач из execute ()? Этот ответ не годится. Я не могу найти ответ, который бы удовлетворил это.
haelix
82

Вы можете использовать Guava MoreExecutors.newDirectExecutorService()или, MoreExecutors.directExecutor()если вам не нужен ExecutorService.

Если включение Guava слишком тяжело, вы можете реализовать что-то почти такое же хорошее:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
  private final CountDownLatch signal = new CountDownLatch(1);

  private SameThreadExecutorService() {
    super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }

  @Override public void shutdown() {
    super.shutdown();
    signal.countDown();
  }

  public static ExecutorService getInstance() {
    return SingletonHolder.instance;
  }

  private static class SingletonHolder {
    static ExecutorService instance = createInstance();    
  }

  private static ExecutorService createInstance() {
    final SameThreadExecutorService instance
        = new SameThreadExecutorService();

    // The executor has one worker thread. Give it a Runnable that waits
    // until the executor service is shut down.
    // All other submitted tasks will use the RejectedExecutionHandler
    // which runs tasks using the  caller's thread.
    instance.submit(new Runnable() {
        @Override public void run() {
          boolean interrupted = false;
          try {
            while (true) {
              try {
                instance.signal.await();
                break;
              } catch (InterruptedException e) {
                interrupted = true;
              }
            }
          } finally {
            if (interrupted) {
              Thread.currentThread().interrupt();
            }
          }
        }});
    return Executors.unconfigurableScheduledExecutorService(instance);
  }
}
НамшубПисатель
источник
1
Для Android это return Executors.unconfigurableExecutorService (instance);
Maragues
если все, что мы используем, - это текущий поток , почему примитивы синхронизации? почему защелка?
haelix
@haelix защелка необходима, потому что даже если работа выполняется в том же потоке, что и тот, который ее добавил, любой поток может завершить работу исполнителя.
NamshubWriter
64

Стиль Java 8:

Executor e = Runnable::run;

lpandzic
источник
8
Абсолютно грязно. Я люблю это.
Rogue
Что в этом гадости? Это элегантно :)
lpandzic
1
Это лучший вид грязного @Ipandzic, он необычный и лаконичный.
Rogue
12

Я написал на ExecutorServiceоснове AbstractExecutorService.

/**
 * Executes all submitted tasks directly in the same thread as the caller.
 */
public class SameThreadExecutorService extends AbstractExecutorService {

    //volatile because can be viewed by other threads
    private volatile boolean terminated;

    @Override
    public void shutdown() {
        terminated = true;
    }

    @Override
    public boolean isShutdown() {
        return terminated;
    }

    @Override
    public boolean isTerminated() {
        return terminated;
    }

    @Override
    public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
        shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
        return terminated;
    }

    @Override
    public List<Runnable> shutdownNow() {
        return Collections.emptyList();
    }

    @Override
    public void execute(Runnable theCommand) {
        theCommand.run();
    }
}
Эрик Обермюльнер
источник
завершенное поле не защищено с помощью synchronized.
Даниил Яицков
1
Поле @ DaneelS.Yaitskov terminatedне получит выгоды от синхронного доступа на основе кода, который на самом деле здесь. Операции с 32-битными полями в Java атомарны.
Кристофер Шульц
Я полагаю, что приведенный выше метод isTerminated () не совсем правильный, потому что isTerminated () должен возвращать истину только в том случае, если в данный момент нет выполняемых задач. Guava отслеживает количество задач в другой переменной, вероятно, поэтому они защищают обе переменные блокировкой.
Джереми К.
7

Вы можете использовать RejectedExecutionHandler для запуска задачи в текущем потоке.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        r.run();
    }
});

Вам понадобится только один из них.

Питер Лоури
источник
Умная! Насколько это безопасно (честный вопрос)? Есть ли способы отклонения задачи, когда вы действительно не хотели бы выполнять ее в текущем потоке? Отклоняются ли задачи, если ExecutorService завершает работу или завершается?
overthink
Поскольку максимальный размер равен 0, каждая задача отклоняется. Однако отклоненное поведение должно выполняться в текущем потоке. Проблема будет только в том случае, если задача НЕ будет отклонена.
Питер Лоури 05
8
Обратите внимание, что реализация этой политики уже существует, нет необходимости определять свою собственную java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy.
jtahlborn 05
7
Больше невозможно создать ThreadPoolExecutor с максимальным размером пула 0. Я думаю, можно было бы воспроизвести поведение, используя blockingQueue размера 0, но никакая реализация по умолчанию, похоже, не позволяет этого.
Axelle Ziegler
который не компилируется из-за {code} if (corePoolSize <0 || maximumPoolSize <= 0 || maximumPoolSize <corePoolSize || keepAliveTime <0) {code} в java.util.ThreadPoolExecutor (как минимум openJdk 7)
Богдан
7

Мне пришлось использовать тот же «CurrentThreadExecutorService» для целей тестирования, и, хотя все предложенные решения были хорошими (особенно то, которое упоминает способ Guava ), я придумал нечто похожее на то, что предложил здесь Питер Лоури .

Как отметил Аксель Ziegler здесь , к сожалению , решение Петра будет на самом деле не работает из - за проверки введенных в ThreadPoolExecutorпо maximumPoolSizeпараметру конструктора (т.е. maximumPoolSizeне может быть <=0).

Чтобы обойти это, я сделал следующее:

private static ExecutorService currentThreadExecutorService() {
    CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
    return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
        @Override
        public void execute(Runnable command) {
            callerRunsPolicy.rejectedExecution(command, this);
        }
    };
}
фабрициокуччи
источник