本文介紹Spring Boot整合Redis實(shí)現(xiàn)隊(duì)列存儲(chǔ)。隊(duì)列存儲(chǔ)通常以Rest微服務(wù)形式提供服務(wù)接口录粱,所以Spring Boot+Redis是一個(gè)理想選型摔桦。
典型的應(yīng)用場(chǎng)景,比如爬蟲系統(tǒng)中任務(wù)列表的存儲(chǔ)疮鲫,各個(gè)爬蟲子進(jìn)(線)程獨(dú)立蘸劈、主動(dòng)訪問該隊(duì)列獲取URLs昏苏,并支持批量獲取。
- Step1:
Spring Boot工程的Maven中添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
本文使用SpringBoot 1.5.2.RELEASE威沫。
- Step2
Application.java入口定義必要的Bean:
@SpringBootApplication(scanBasePackages = {
"", "" })
public class Application implements CommandLineRunner {
@Autowired private JedisConnectionFactory jedisConnFactory;
@Bean
public StringRedisTemplate redisTemplate() {
StringRedisTemplate redisTemplate = new StringRedisTemplate();
redisTemplate.setConnectionFactory(jedisConnFactory);
return redisTemplate;
}
@Bean
public QueueService queueService() {
return new QueueServiceSDRImpl(redisTemplate());
}
public static void main(String[] args) throws InterruptedException {
SpringApplication app = new SpringApplication(Application.class);
app.setBannerMode(Banner.Mode.CONSOLE);
app.setWebEnvironment(true);
app.run(args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("Project: running...");
}
}
在此只定義一個(gè)StringRedisTemplate贤惯,至于保存對(duì)象的需求可以手動(dòng)轉(zhuǎn)成json存儲(chǔ)。
- Step3:定義QueueService接口:
public interface QueueService {
/**
* 取N條URL隊(duì)列數(shù)據(jù)
* @param fullTaskName
* @param numbersOfURL
* @return
*/
public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL);
/**
* URL隊(duì)列入隊(duì)
* @param webURLList
* @return
*/
public Long enQueue(String fullTaskName, String... webURLJSONStringArray);
/**
* URL隊(duì)列長度
* @param fullTaskName
* @return
*/
public Long queueSize(String fullTaskName);
/**
* 清空URL隊(duì)列
* @param fullTaskName
* @return
*/
public void queueDump(String fullTaskName);
/**
* 是否已訪問過
* @param fullTaskName
* @param url
* @return
*/
public Boolean hasVisit(String fullTaskName, String url);
/**
* 保存鏈接對(duì)象
* @param fullTaskName
* @param url
*/
public Long saveURL(String fullTaskName, String... visitedLinkArray);
}
- Step4:QueueServiceSDRImpl.java的具體實(shí)現(xiàn):
public class QueueServiceSDRImpl implements QueueService {
private StringRedisTemplate redisTemplate;
private static String HEAD_HISTORY = "HIST:";
private static String HEAD_QUEUE = "QUEUE:";
public QueueServiceSDRImpl(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL) {
List<Object> results = redisTemplate.executePipelined(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
for (int i = 0; i < numbersOfURL; i++) {
stringRedisConn.lPop(HEAD_QUEUE.concat(fullTaskName));
}
return null;
}
});
return results.stream().filter(obj -> obj != null).map(obj -> JSONObject.parseObject(obj.toString(), BasicWebURL.class)).collect(Collectors.toList());
}
@Override
public Long enQueue(String fullTaskName, String... webURLJSONStringArray) {
Long result = -1L;
BoundListOperations<String, String> opt = redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName));
try {
opt.rightPushAll(webURLJSONStringArray);
} catch (JedisException e) {
e.printStackTrace();
}
return result;
}
@Override
public Long queueSize(String fullTaskName) {
return redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).size();
}
@Override
public void queueDump(String fullTaskName) {
redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
}
@Override
public Boolean hasVisit(String fullTaskName, String url) {
Boolean hasVisit = false;
try {
hasVisit = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).isMember(url);
} catch (JedisException e) {
e.printStackTrace();
}
return hasVisit;
}
@Override
public Long saveURL(String fullTaskName, String... visitedLinkArray) {
Long result = -1L;
try {
result = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).add(visitedLinkArray);
} catch (JedisException e) {
e.printStackTrace();
}
return result;
}
}
在QueueServiceSDRImpl中實(shí)現(xiàn)了兩種隊(duì)列(庫)棒掠,庫的value分別是List和Set孵构,特性對(duì)應(yīng)java中的List(有序)和Set(查重)各自特性。fullTaskName為Spring封裝的Redis存儲(chǔ)中的key對(duì)象烟很。
- Step5颈墅,最后看一下QueueController如何暴露服務(wù)接口:
@RestController
@RequestMapping()
public class QueueController {
@Autowired QueueService queueService;
/**
* Fetch n BasicWebURLs.
* @param request
* @param fullTaskName
* @param numbersOfURL
* @return
*/
@GetMapping("/queue/{fullTaskName}")
public JSONObject webURL(HttpServletRequest request,
@PathVariable String fullTaskName,
@RequestParam(defaultValue="10", required=false) Long numbersOfURL) {
JSONObject jo = new JSONObject();
if(numbersOfURL > 0) {
jo.put("popLength", numbersOfURL);
jo.put("data", queueService.fetchN(fullTaskName, numbersOfURL));
}else{
jo.put("popLength", 0);
jo.put("data", Lists.newArrayList());
}
jo.put("stillHas", queueService.queueSize(fullTaskName));
return jo;
}
/**
* 入隊(duì)
* @param request
* @param fullTaskName
* @param body
* @return
*/
@PostMapping("/queue/{fullTaskName}")
public Long enQueue(HttpServletRequest request, @PathVariable String fullTaskName, @RequestBody String body) {
JSONObject jo = JSONObject.parseObject(body);
if(jo != null){
JSONArray webURLList = jo.getJSONArray("webURLs");
if(!webURLList.isEmpty()) {
String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
return queueService.enQueue(fullTaskName, jsonArray);
}
}
return -1L;
}
/**
*
* @param request
* @param fullTaskName
* @return
*/
@DeleteMapping("/queue/{fullTaskName}")
public Integer queueDump(HttpServletRequest request, @PathVariable String fullTaskName) {
queueService.queueDump(fullTaskName);
return 1;
}
}
以及在前文所述爬蟲系統(tǒng)場(chǎng)景中,用作查重的接口:
@RestController
@RequestMapping("/link")
@Getter
@Setter
public class VisitedLinkController {
@Autowired QueueService queueService;
@GetMapping("/{fullTaskName}")
public String webURL(HttpServletRequest request,
@PathVariable String fullTaskName,
@RequestParam(defaultValue="", required = false) String link) {
return queueService.hasVisit(fullTaskName, link) ? "y" : "n";
}
/**
* 加入訪問歷史
* @param request
* @param fullTaskName
* @param body
* @return
*/
@PostMapping("/{fullTaskName}")
public Boolean visitLinks(HttpServletRequest request,
@PathVariable String fullTaskName, @RequestBody String body) {
JSONObject jo = JSONObject.parseObject(body);
if(jo != null){
JSONArray webURLList = jo.getJSONArray("visitedLinks");
if(!webURLList.isEmpty()) {
String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
queueService.saveURL(fullTaskName, jsonArray);
}
}
return true;
}
}
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;