在 Java 中衫樊,我們常常開啟多個線程來提高運(yùn)行效率。但是有時候我們需要這些線程的返回值。應(yīng)該如何獲取線程的返回值呢科侈?
我現(xiàn)在有10萬個整數(shù)载佳,我需要開5個線程來找它們的最大值。每個線程處理2萬個整數(shù)臀栈,然后返回這2萬個整數(shù)的最大值蔫慧。最終我們再找出整體的最大值。
1. 使用線程安全的共享變量
我們使用一個類似于 LinkedList 的共享變量來存放這5個線程各自找到的最大值权薯。但是 LinkedList 是線程不安全的姑躲,所以可以換為 ConcurrentLinkedQueue 。
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
public class MaxValueFinder {
public static void main(String[] args) throws InterruptedException {
int[] nums = generateNums(100000);
ConcurrentLinkedQueue<Integer> resultQueue = new ConcurrentLinkedQueue<>();
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
int startIndex = i * (nums.length / threads.length);
int endIndex = (i + 1) * (nums.length / threads.length);
threads[i] = new MaxValueThread(nums, startIndex, endIndex, resultQueue);
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
int maxValue = resultQueue.poll();
while (!resultQueue.isEmpty()) {
int value = resultQueue.poll();
if (value > maxValue) {
maxValue = value;
}
}
System.out.println("Max value: " + maxValue);
}
private static int[] generateNums(int n) {
int[] nums = new int[n];
Random random = new Random();
for (int i = 0; i < n; i++) {
nums[i] = random.nextInt();
}
return nums;
}
static class MaxValueThread extends Thread {
private int[] nums;
private int startIndex;
private int endIndex;
private ConcurrentLinkedQueue<Integer> resultQueue;
MaxValueThread(int[] nums, int startIndex, int endIndex, ConcurrentLinkedQueue<Integer> resultQueue) {
this.nums = nums;
this.startIndex = startIndex;
this.endIndex = endIndex;
this.resultQueue = resultQueue;
}
@Override
public void run() {
int maxValue = nums[startIndex];
for (int i = startIndex + 1; i < endIndex; i++) {
if (nums[i] > maxValue) {
maxValue = nums[i];
}
}
resultQueue.add(maxValue);
}
}
}
在這個示例中崭闲,我們創(chuàng)建了一個ConcurrentLinkedQueue來存儲線程的計算結(jié)果肋联。在每個線程的run方法中,線程會計算它被分配的一部分?jǐn)?shù)組元素的最大值刁俭,并將結(jié)果添加到ConcurrentLinkedQueue中。在主線程中韧涨,我們從ConcurrentLinkedQueue中取出所有結(jié)果牍戚,找出最大值并輸出。
需要注意的是虑粥,ConcurrentLinkedQueue 雖然是線程安全的如孝,但是并不能保證它的順序,因此在處理結(jié)果時需要考慮順序的問題娩贷。在這個示例中第晰,我們只需要找出最大值,因此順序不影響結(jié)果彬祖。如果需要按照順序處理結(jié)果茁瘦,可以考慮使用其他的線程安全集合。
2. 使用Future+Callable
import java.util.Random;
import java.util.concurrent.*;
public class MaxFinder {
private static final int THREAD_COUNT = 5;
private static final int ARRAY_SIZE = 100000;
private static final int SUB_ARRAY_SIZE = ARRAY_SIZE / THREAD_COUNT;
private static final int MAX_VALUE = 1000000;
public static void main(String[] args) {
int[] arr = generateRandomArray(ARRAY_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
Future<Integer>[] futures = new Future[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
int startIndex = i * SUB_ARRAY_SIZE;
int endIndex = (i + 1) * SUB_ARRAY_SIZE;
futures[i] = executorService.submit(new MaxFinderTask(arr, startIndex, endIndex));
}
int max = Integer.MIN_VALUE;
for (int i = 0; i < THREAD_COUNT; i++) {
try {
int result = futures[i].get();
if (result > max) {
max = result;
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("Max value: " + max);
executorService.shutdown();
}
private static int[] generateRandomArray(int size) {
int[] arr = new int[size];
Random random = new Random();
for (int i = 0; i < size; i++) {
arr[i] = random.nextInt(MAX_VALUE);
}
return arr;
}
private static class MaxFinderTask implements Callable<Integer> {
private final int[] arr;
private final int startIndex;
private final int endIndex;
public MaxFinderTask(int[] arr, int startIndex, int endIndex) {
this.arr = arr;
this.startIndex = startIndex;
this.endIndex = endIndex;
}
@Override
public Integer call() throws Exception {
int max = Integer.MIN_VALUE;
for (int i = startIndex; i < endIndex; i++) {
if (arr[i] > max) {
max = arr[i];
}
}
return max;
}
}
}
這個程序中储笑,我們使用ExecutorService創(chuàng)建了一個線程池甜熔,然后使用Future來獲取每個線程的結(jié)果。在主線程中突倍,我們使用Future.get()方法來等待每個線程的結(jié)果腔稀,并使用它們來找到整個數(shù)組的最大值。注意羽历,在這個例子中焊虏,我們使用Callable接口來創(chuàng)建每個線程的任務(wù),并在任務(wù)的call()方法中返回結(jié)果秕磷。
3. 使用Stream
Stream(流)有點(diǎn)類似于 Golang 的管道诵闭,非常方便。
Java 8引入了Stream API跳夭,可以對集合和數(shù)組等數(shù)據(jù)源進(jìn)行流式操作涂圆,其中包括并行處理數(shù)據(jù)源的能力们镜。下面是一個使用Stream來實(shí)現(xiàn)任務(wù)的示例代碼:
int[] nums = ...; // 10萬個整數(shù)
int parallelism = 5; // 并行度
int max = Arrays.stream(nums)
.parallel()
.unordered()
.mapToInt(Integer::intValue)
.limit((long) Math.ceil(nums.length / (double) parallelism))
.max()
.getAsInt();
這個示例代碼首先將整數(shù)數(shù)組轉(zhuǎn)換為流,然后使用parallel()方法啟用并行流處理润歉,使用unordered()方法告訴Stream不必保證元素順序模狭,使用mapToInt(Integer::intValue)方法將流中的元素轉(zhuǎn)換為整數(shù)類型,使用limit()方法限制每個線程需要處理的元素個數(shù)踩衩,最后使用max()方法找到流中的最大值并返回嚼鹉。
這個示例代碼使用了Java 8的lambda表達(dá)式和方法引用,可以讓代碼更加簡潔和易讀驱富。