在項(xiàng)目中显熏,團(tuán)隊(duì)也使用了Kafka作為消息中間件。 經(jīng)過(guò)了嵌入式redis選型的問(wèn)題之后晒屎,筆者在嵌入式kafka選型時(shí)就更傾向于還在持續(xù)更新喘蟆,并且維護(hù)人員是一個(gè)團(tuán)隊(duì)而不是個(gè)人或者松散的組織。 最終鼓鲁,筆者選擇了來(lái)自salesforce的開(kāi)源項(xiàng)目
<groupId>com.salesforce.kafka.test</groupId>
<artifactId>kafka-junit</artifactId>
<version>3.0.1</version>
以下是在項(xiàng)目自帶的測(cè)試用例代碼上稍加修改的案例蕴轨。
package com.salesforce.kafka.test.junit4;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Node;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.salesforce.kafka.test.KafkaBroker;
import com.salesforce.kafka.test.KafkaTestServer;
public class KafkaBrokerTest {
private static final Logger logger = LoggerFactory.getLogger(KafkaBrokerTest.class);
/**
* Validate that we started 2 brokers.
* @throws Exception
*/
@Test
public void testTwoBrokersStarted() throws Exception {
Properties overrideProperties = new Properties();
overrideProperties.put("broker.id", "1");
overrideProperties.put("port", "6666");
KafkaTestServer server1= new KafkaTestServer(overrideProperties);
server1.start();
String string= server1.getKafkaBrokers().getBrokerById(1).getConnectString();
logger.info("\n"+string+"\n");
overrideProperties.put("broker.id", "2");
overrideProperties.put("port", "8888");
KafkaTestServer server2= new KafkaTestServer(overrideProperties);
server2.start();
String string2= server2.getKafkaBrokers().getBrokerById(2).getConnectString();
logger.info("\n"+string2+"\n");
}
}
問(wèn)題
從上述案例中可以看出,在實(shí)際的kafka使用中坐桩,IP+端口號(hào)是每個(gè)kafka broker都不一樣的尺棋。但是在salesforce/kafka-core中提供的KafkaTestCluster類(lèi)中封锉,其并沒(méi)有給外部來(lái)指定某個(gè)broker port的機(jī)會(huì)绵跷。
/**
* Starts the cluster.
* @throws Exception on startup errors.
* @throws TimeoutException When the cluster fails to start up within a timely manner.
*/
public void start() throws Exception, TimeoutException {
// Ensure zookeeper instance has been started.
zkTestServer.start();
// If we have no brokers defined yet...
if (brokers.isEmpty()) {
// Loop over brokers, starting with brokerId 1.
for (int brokerId = 1; brokerId <= numberOfBrokers; brokerId++) {
// Create properties for brokers
final Properties brokerProperties = new Properties();
// Add user defined properties.
brokerProperties.putAll(overrideBrokerProperties);
// Set broker.id
brokerProperties.put("broker.id", String.valueOf(brokerId));
// Create new KafkaTestServer and add to our broker list
brokers.add(
new KafkaTestServer(brokerProperties, zkTestServer)
);
}
}
// Loop over each broker and start it
for (final KafkaTestServer broker : brokers) {
broker.start();
}
// Block until the cluster is 'up' or the timeout is exceeded.
waitUntilClusterReady(10_000L);
}
這在某些需要預(yù)先指定IP+端口號(hào)的場(chǎng)景中還是有一些麻煩的。需要后續(xù)進(jìn)行處理成福。例如碾局,給這個(gè)類(lèi)新增一個(gè)構(gòu)造方法,利用以下的List ,把已經(jīng)完成初始化的List<KafkaTestServer> brokers 作為入?yún)鬟f進(jìn)去奴艾。
private final List<KafkaTestServer> brokers = new ArrayList<>();
新增構(gòu)造方法:
public KafkaTestCluster(final List<KafkaTestServer> brokers)