分布式鎖實現(xiàn)方案總結(jié)

在傳統(tǒng)單機(jī)項目中為了解決并發(fā)訪問的問題鳖链,我們通常都會使用鎖來實現(xiàn)诚纸。一般使用synchronized關(guān)鍵字來鎖住臨界區(qū)鞠评,當(dāng)然也可以使用Lock鎖驰凛。但是現(xiàn)在很多項目都使用分布式集群方式部署非驮,synchronized關(guān)鍵字無法實現(xiàn)跨機(jī)器之間的鎖交汤,Lock也是如此,為了解決這種問題劫笙,需要設(shè)計一種新的分布式鎖芙扎。

分布式鎖除了具有分布式情況下控制并發(fā)的問題,同時還需要考慮以下問題:

1.較高的性能填大,如果實現(xiàn)分布式鎖的方式太差戒洼,會影響并發(fā)訪問。
2.可重入性栋盹,同一個線程可多次獲取鎖施逾。
3.鎖失效機(jī)制,如果一個線程在獲取鎖后掛掉了例获,需要釋放鎖汉额,否則其他線程將一直獲取不到鎖。
4.如果用戶在失效時間內(nèi)沒有完成任務(wù)榨汤,鎖如何續(xù)期蠕搜,該續(xù)期多長時間。

基于以上問題收壕,共設(shè)計了三種分布式鎖
基于數(shù)據(jù)庫實現(xiàn)分布式鎖
基于數(shù)據(jù)庫實現(xiàn)分布式鎖主要是利用數(shù)據(jù)庫唯一索引的排斥性和天然支持原子操作有關(guān)妓灌,可以設(shè)計如下的表結(jié)構(gòu):

//基于數(shù)據(jù)庫創(chuàng)建分布式鎖
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock`  (
  `id` bigint(0) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `method_name` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '臨界區(qū)方法',
  `machine_id` varchar(255) CHARACTER SET utf8mb4  NOT NULL COMMENT '機(jī)器id',
  `thread_id` varchar(255) CHARACTER SET utf8mb4  NOT NULL COMMENT '線程id',
  `invalid_time` datetime(0) NOT NULL COMMENT '失效時間',
  `desc` varchar(255) CHARACTER SET utf8mb4 NULL DEFAULT NULL COMMENT '描述',
  `update_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新時間',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `idx_method_name`(`method_name`) USING BTREE COMMENT '方法名主鍵'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '分布式鎖表' ROW_FORMAT = Dynamic;

加鎖:insert into method_lock(method_name, machine_id, thread_id, invalid_time)values("addUser", "001", "thread-01", "2022-01-01 01:01:01")
釋放鎖:delete from method_lock where method_name = "addUser" and machine_id = "001" and thread_id = "thread-01"
說明:客戶端1加鎖成功后,客戶端再執(zhí)行加鎖語句就會因為method_name創(chuàng)建了唯一索引而失敗蜜宪,這樣就達(dá)到不同機(jī)器并發(fā)訪問的問題虫埂。
以上是使用mysql自身的鎖來完成,mysql的鎖是一種排他鎖圃验,其實也可以利用樂觀鎖來完成掉伏。即通過version來完成鎖控制。
但是還需要考慮上面分布式鎖的四個問題?

1.由于數(shù)據(jù)庫的查詢性能問題斧散,需要使用集群部署或者主從復(fù)制
2.為了完成可重入性供常,我們在表結(jié)構(gòu)中添加了machine_id和thread_id來實現(xiàn)。如果客戶端1再次執(zhí)行加鎖過程鸡捐,需要先根據(jù)機(jī)器和線程信息查詢栈暇,如果查詢到則直接返回。如果沒有則說明沒有加鎖成功箍镜,則嘗試加鎖或者失敗源祈。為了查詢是否已經(jīng)加鎖,需要使用select * from table for update語句鹿寨,此語句可以避免臟讀新博。
3.為了客戶端1線程宕掉后薪夕,不影響其他客戶端操作脚草,需要存儲線程的失效時間,并利用定時任務(wù)清理失效鎖
4.如果線程在規(guī)定時間內(nèi)沒有完成任務(wù)原献,需要利用定時任務(wù)在查詢失效鎖的同時查詢線程狀態(tài)馏慨,如果線程存活就對鎖進(jìn)行續(xù)期。

note:這種方式需要考慮的問題很多姑隅,同時性能也不太理想写隶,所以使用這種方式的很少。

基于redis實現(xiàn)分布式鎖
基于redis實現(xiàn)分布式鎖主要是利用redis的性能和某些特性讲仰。
加鎖:利用redis的SETNX命令和EXPIRE命令完成setnx key val是如果key不存在則val設(shè)置成功慕趴,返回1;否則val設(shè)置失敗鄙陡,返回0冕房。以下語句可以完成。

key是加鎖的方法趁矾,value是線程的唯一標(biāo)識
set nx key value

存在的問題:
①假設(shè)線程獲取了鎖之后耙册,在執(zhí)行任務(wù)的過程中掛掉,但是線程還沒有執(zhí)行del命令釋放鎖毫捣,那么競爭該鎖的線程都會執(zhí)行不了详拙,產(chǎn)生死鎖的情況。
②釋放鎖的時候為了保證只釋放自己的鎖蔓同,需要先判斷加鎖的value饶辙,如果不等于自己才能刪除。但是這兩部操作并不是原子性的斑粱,所以釋放鎖需要使用lua腳本弃揽。

解決方案:如下代碼所示:

//加鎖過程
public Boolean getLock(String key,String value,Long timeout){
   boolean result = this.redisTemplate.opsForValue().setIfAbsent(key,value, Duration.ofSeconds(timeout));
   return result;
 }

//釋放鎖過程
public Long releaseLock(String key,String value){
   String script= "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
   RedisScript<Long> redisScript = new DefaultRedisScript<>(script,Long.class);
   Long result= (Long)this.redisTemplate.execute(redisScript, Collections.singletonList(key),value);
   return result;
}

//調(diào)用過程
//添加自旋邏輯
boolean result = false;
do{
   result = redisService.getLock(key, value, timeout);
}while(!result) 
try{
   ......  //業(yè)務(wù)代碼
}catch(Exception e){
}finally{
   //最后釋放鎖的邏輯
   redisService.releaseLock(key, value);
}

存在的問題:這個方案雖然解決了過期和釋放鎖的問題,但是沒有實現(xiàn)可重入性蹋宦,當(dāng)已經(jīng)加鎖的線程再次加鎖時就會失敗披粟。
解決方案:需要在加鎖失敗后判斷加鎖線程是否是自己,如果是自己則+1返回成功冷冗,否則返回失敗守屉。這樣加鎖和釋放鎖都需要執(zhí)行l(wèi)ua腳本完成多步操作。

local key = KEYS[1]; -- 第1個參數(shù),鎖的key
local value = ARGV[1]; -- 第2個參數(shù),鎖的值蒿辙,可以是線程的唯一標(biāo)識
local timeout = ARGV[2]; -- 第3個參數(shù),鎖的自動釋放時間

-- 判斷鎖是否已存在
if (redis.call('exists', key) == 0) then
   -- 不存在, 則獲取鎖
   redis.call('hset', key, value, '1');
   redis.call('expire', key, timeout);
   return 1;
end ;
-- 鎖已存在拇泛,判斷擁有者是否是自己
if (redis.call('hexists', key, value) == 1) then
   -- 如果是自己,則重入次數(shù)+1
   redis.call('hincrby', key, value, '1');
   redis.call('expire', key, timeout);
   return 1;
end ;
return 0; -- 鎖已經(jīng)被獲取思灌,并且不是自己俺叭,返回獲取鎖失敗
local key = KEYS[1]; -- 第1個參數(shù),鎖的key
local value = ARGV[1]; -- 第2個參數(shù),鎖的值,可以是線程的唯一標(biāo)識

-- 判斷鎖是否已存在
if (redis.call('exists', key) == 0) then
   return 0;
end ;
-- 鎖已存在泰偿,判斷擁有者是否是自己
if (redis.call('hexists', key, value) == 0) then
   return 0;
end ;
local count = redis.call('hincrby', key, value, '-1');
-- 當(dāng)可重入次數(shù)為0時熄守,需要刪除key
if (tonumber(count) == 0) then
   redis.call('del', key);
   return 1;
end ;
return 1;

然而上面的代碼還是有問題,就是續(xù)期的問題耗跛。因為如果線程A在獲取到鎖之后裕照,執(zhí)行業(yè)務(wù)時間較長,鎖過期了调塌。那么其他線程就可以獲得鎖了晋南,所以還需要寫一個定時任務(wù)定時查詢鎖是否快要過期,如果快過期了就續(xù)期一段時間羔砾。
Redisson實現(xiàn)
針對上面這些問題负间,redisson提供了實現(xiàn),它將加鎖和解鎖的lua腳本姜凄,以及續(xù)期功能功能(看門狗)都統(tǒng)一到了一個簡單地邏輯中

@Bean
public RedissonClient getRedisson() {
   Config config = new Config();
   // 修改看門狗默認(rèn)的鎖過期時間
   config.setLockWatchdogTimeout(100);
   config.useSingleServer().setAddress("redis://localhost:6379").setPassword("123456");
   return Redisson.create(config);
}
RLock lock = redissonClient.getLock(key);  //key和上面的redis鍵意思相同
try {
   lock.lock();
   .....  //業(yè)務(wù)代碼       
} catch (Exception e) {
   e.printStackTrace();
} finally {
   lock.unlock();
}

上面代碼就完成了分布式鎖的功能政溃,而且加鎖和解鎖都是通過lua腳本來完成的。簡單方便檀葛,lock.lock()默認(rèn)加鎖時間為30s玩祟, 后臺默認(rèn)會啟動一個WatchDog的看門狗線程,該線程每10s去掃描一下該鎖是否被釋放屿聋,如果沒有釋放那么就延長至30s空扎,這個機(jī)制就是看門狗機(jī)制。如果客戶端請求沒有獲取到鎖润讥,那么它將while循環(huán)獲取繼續(xù)嘗試加鎖转锈。網(wǎng)上關(guān)于redisson的描述圖:


redisson描述
//該方式下會使用默認(rèn)的加鎖時間,即30s楚殿。但是如果自定義程序中自定義加鎖時間撮慨,則看門狗線程不會進(jìn)行鎖續(xù)期。
void lock();
void lock(long leaseTime, TimeUnit unit);
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

單機(jī)模式下,redisson還有一個問題:就是線程A如果剛獲取到redis的鎖砌溺,此時redis宕機(jī)了影涉,重新啟動后鎖過期了,然后線程B這時過來是可以加鎖的规伐,然后A線程也認(rèn)為自己還在加鎖蟹倾,這種問題怎么解決?

redis宕機(jī)的時候猖闪,別那么快去重啟redis鲜棠,可以在你的方法里面設(shè)置事務(wù)的超時時間,等到事務(wù)超時再去重啟redis應(yīng)用培慌。

在redis集群環(huán)境下豁陆,redission實際還存在一些問題,就是線程A如果剛獲取到master節(jié)點的鎖吵护,還沒有同步到slave節(jié)點盒音,此時master節(jié)點給掛了,然后線程B這時過來是可以加鎖的何址,但是實際上master已經(jīng)被A加鎖過了里逆,這就是集群環(huán)境下可能出現(xiàn)的問題进胯?

Redlock 是 Redis 的作者 antirez 給出的集群模式的 Redis 分布式鎖用爪,它基于 N 個完全獨立的 Redis 節(jié)點。
算法描述:
①客戶端嘗試獲取 N 個節(jié)點的鎖胁镐,(每個節(jié)點獲取鎖的方式和前面說的緩存鎖一樣)偎血,N 個節(jié)點以相同的 key 和 value 獲取鎖。而且客戶端需要設(shè)置接口訪問超時盯漂,接口超時時間需要遠(yuǎn)遠(yuǎn)小于鎖超時時間颇玷,這樣可以在有 redis 節(jié)點宕機(jī)后,訪問該節(jié)點時能盡快超時就缆,而減小鎖的正常使用帖渠。
②客戶端需要獲得了超過 半數(shù) 節(jié)點的鎖,而且獲取鎖的時間小于鎖的超時時間竭宰,客戶端才獲得了分布式鎖空郊。
③客戶端獲取的鎖的時間為設(shè)置的鎖超時時間減去獲取鎖花費時間。
④如果客戶端獲取鎖失敗了切揭,會依次請求所有節(jié)點刪除所有的鎖狞甚。
note:使用 Redlock 算法,可以保證在掛掉最多 不超過半數(shù)節(jié)點的時候廓旬,分布式鎖服務(wù)仍然能工作哼审,這相比之前的單機(jī)模式大大提高了可用性。有關(guān)redisson分布式鎖問題的問題可以查看基于Redission實現(xiàn)分布式鎖

基于zookeeper實現(xiàn)分布式鎖
zookeeper是分布式系統(tǒng)協(xié)調(diào)服務(wù),可以解決分布式系統(tǒng)下數(shù)據(jù)同步問題涩盾。我們可以在上面創(chuàng)建樹形結(jié)構(gòu)的節(jié)點十气,相當(dāng)于一個分布式文件系統(tǒng)+watch服務(wù)。而且同一目錄下只能有唯一的文件名春霍。
zookeeper的節(jié)點分為兩類:
1)持久節(jié)點 :節(jié)點一旦創(chuàng)建桦踊,除非被主動刪除,否則一直存在终畅;
2)臨時節(jié)點 :一旦創(chuàng)建該節(jié)點的客戶端會話失效籍胯,則所有該客戶端創(chuàng)建的臨時節(jié)點都會被刪除。

原理: 就是利用zookeeper 的臨時有序節(jié)點和 watcher 機(jī)制來實現(xiàn)离福≌壤牵客戶端創(chuàng)建zookeeper的臨時有序節(jié)點,如果該節(jié)點序號最小則返回成功妖爷,如果不是則監(jiān)聽前一個節(jié)點的蝶涩,一旦客戶端會話結(jié)束,臨時節(jié)點就會釋放絮识,后一個節(jié)點就會監(jiān)聽到該事件并重新獲取鎖绿聘。

面對上面提到的死鎖,可重入次舌,續(xù)期以及非阻塞問題熄攘,zookeeper該如何解決?

死鎖:客戶端會在ZK中創(chuàng)建一個臨時節(jié)點彼念,一旦客戶端獲取到鎖之后突然掛掉(Session連接斷開)挪圾,那么這個臨時節(jié)點就會自動刪除掉。其他客戶端就可以再次獲得鎖逐沙。
不可重入:客戶端在創(chuàng)建節(jié)點的時候哲思,可以把當(dāng)前客戶端的主機(jī)信息和線程信息直接寫入到節(jié)點中,下次想要獲取鎖的時候和當(dāng)前最小的節(jié)點中的數(shù)據(jù)比對一下就可以了吩案。如果相同棚赔,那么自己直接獲取到鎖,如果不一樣就再創(chuàng)建一個臨時的順序節(jié)點徘郭,參與排隊靠益。
續(xù)期問題:zookeeper的臨時節(jié)點不存在過期時間,所以只有在客戶端主動刪除節(jié)點或者斷開連接的情況下才會失效崎岂,所以不存在續(xù)期問題捆毫。
非阻塞鎖:客戶端可以通過在節(jié)點上添加watch監(jiān)聽,一旦節(jié)點有變化冲甘,zookeeper會通知客戶端绩卤。不需要空轉(zhuǎn)途样。

springboot代碼如下:

<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
</dependency>
ZKLockService zkLockService = new ZKLockService(zooKeeper);
try {
   zkLockService.tryLock(name);
   //xxxxxxx     // 業(yè)務(wù)邏輯          
} catch (Exception e) {
   e.printStackTrace();
} finally {
   zkLockService.unLock();
}
public class ZKLockService implements AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback, Watcher {
   private ZooKeeper zooKeeper;
   private String nodeName;
   private CountDownLatch countDownLatch = new CountDownLatch(1);
   private final static String PARENT_NODE = "/zkLock";

   public ZKLockService(ZooKeeper zooKeeper) {
       this.zooKeeper = zooKeeper;
   }

   // 創(chuàng)建節(jié)點之后需要阻塞等待,因為zk獲取子節(jié)點的方式是通過回調(diào)的方式濒憋。
   public void tryLock(String value) throws InterruptedException {
       zooKeeper.create(PARENT_NODE + "/lock", value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "orderLock");
       countDownLatch.await();
   }

   public void unLock() throws InterruptedException, KeeperException {
       zooKeeper.delete(nodeName, -1);
   }

   //創(chuàng)建子節(jié)點的回調(diào)函數(shù)何暇,獲取子節(jié)點信息
   @Override
   public void processResult(int i, String s, Object o, String s1) {
       this.nodeName = s1;
       zooKeeper.getChildren(PARENT_NODE, false, this, o);
   }

   //獲取子節(jié)點信息的回調(diào)函數(shù),判斷是否加鎖成功凛驮;如果加鎖失敗裆站,則監(jiān)聽前一個節(jié)點
   @Override
   public void processResult(int i, String s, Object o, List<String> list, Stat stat) {
       if (list == null || list.isEmpty()) {
           System.out.println("children is null......");
           return;
       }
       Collections.sort(list);

       String minPath = list.get(0);
       String tempNodeName = this.nodeName.substring(8);
       if (minPath.equals(tempNodeName)) {
           System.out.println("加鎖成功");
           countDownLatch.countDown();
       } else {
           System.out.println("加鎖失敗");
           int index = list.indexOf(tempNodeName);
           // 只監(jiān)聽前一個結(jié)點,當(dāng)前一個節(jié)點掛掉黔夭,只需要通過后一個節(jié)點即可宏胯。能避免羊群效應(yīng)
           zooKeeper.exists(PARENT_NODE + "/" + list.get(index - 1), this, this, o);
       }
   }

   //監(jiān)聽前一個節(jié)點的方法實現(xiàn),當(dāng)前一個節(jié)點被刪除后本姥,重新獲取子節(jié)點信息比較判斷
   @Override
   public void process(WatchedEvent watchedEvent) {
       switch (watchedEvent.getType()) {
           case NodeDeleted:
               zooKeeper.getChildren(PARENT_NODE, false, this, "orderLock");
               break;
           default:
               break;
       }
   }

   @Override
   public void processResult(int i, String s, Object o, Stat stat) {
   }
}

note:上段代碼create和getChildren方法使用的都是zookeeper的異步方法肩袍,如果使用同步方法,可以這樣寫:

public class ZKLockServiceNew implements AsyncCallback.StatCallback, Watcher {
   private ZooKeeper zooKeeper;
   private String nodeName;
   private final static String PARENT_NODE = "/zkLock";
   private CountDownLatch countDownLatch = new CountDownLatch(1);

   public ZKLockServiceNew(ZooKeeper zooKeeper) {
       this.zooKeeper = zooKeeper;
   }

   public void tryLock(String value) throws InterruptedException, KeeperException {
       String path = zooKeeper.create(PARENT_NODE + "/lock", value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
       nodeName = path;
       boolean flag = false;
       do {
           flag = verifyMinNode(path);
       } while (!flag);
   }

   public void unLock() throws InterruptedException, KeeperException {
       zooKeeper.delete(nodeName, -1);
   }

   @Override
   public void process(WatchedEvent watchedEvent) {
       switch (watchedEvent.getType()) {
           case NodeDeleted:
               countDownLatch.countDown();
               break;
           default:
               break;
       }
   }

   @Override
   public void processResult(int i, String s, Object o, Stat stat) {
   }

   private boolean verifyMinNode(String path) throws KeeperException, InterruptedException {
       List<String> children = zooKeeper.getChildren(PARENT_NODE, false);
       if (children == null || children.isEmpty()) {
           System.out.println("children is null......");
           return false;
       }
       Collections.sort(children);

       String minPath = children.get(0);
       String tempNodeName = this.nodeName.substring(8);
       if (minPath.equals(tempNodeName)) {
           System.out.println("加鎖成功");
           return true;
       } else {
           System.out.println("加鎖失敗");
           int index = children.indexOf(tempNodeName);
           zooKeeper.exists(PARENT_NODE + "/" + children.get(index - 1), this, this, "AAA");
           countDownLatch.await();
       }
       return false;
   }
}

Curator是已經(jīng)封裝好的zookeeper類婚惫,通過它可以更方便的完成分布式鎖的功能氛赐,以下是示例代碼:

public class CuratorTest {
   public static void main(String[] args) throws Exception {
       CuratorFramework zkClient = getClient();
       InterProcessMutex zkMutex = new InterProcessMutex(zkClient, "/test/mutex");

       ExecutorService executor = Executors.newFixedThreadPool(10);
       for (int i = 0; i < 10; i++) {
           final int temp = i;
           executor.submit(() -> {
               System.out.println("線程" + temp + "啟動");
               try {
                   zkMutex.acquire(); //阻塞等待,也可超時等待
                   System.out.println("線程" + temp + "獲取到鎖");
                   Thread.sleep(3000);
                   zkMutex.release();
                   System.out.println("線程" + temp + "釋放鎖");
               } catch (Exception e) {
                   throw new RuntimeException(e);
               }
           });
       }
       TimeUnit.SECONDS.sleep(60);

       System.out.println("end----");
   }

   public static CuratorFramework getClient() {
       ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
       CuratorFramework zkClient = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
       zkClient.start();
       return zkClient;
   }
}

參照:
實現(xiàn)分布式鎖的三種方式
實現(xiàn)分布式鎖的方式
分布式鎖
elk
elk基礎(chǔ)
分布式基礎(chǔ)
分布式基礎(chǔ)理論
Redis可重入鎖的實現(xiàn)設(shè)計
Zookeeper介紹
zookeeper介紹
zookeeper分布式鎖

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末先舷,一起剝皮案震驚了整個濱河市艰管,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蒋川,老刑警劉巖牲芋,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異尔破,居然都是意外死亡街图,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進(jìn)店門懒构,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人耘擂,你說我怎么就攤上這事胆剧。” “怎么了醉冤?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵秩霍,是天一觀的道長。 經(jīng)常有香客問我蚁阳,道長铃绒,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任螺捐,我火速辦了婚禮颠悬,結(jié)果婚禮上矮燎,老公的妹妹穿的比我還像新娘。我一直安慰自己赔癌,他們只是感情好诞外,可當(dāng)我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著灾票,像睡著了一般峡谊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上刊苍,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天既们,我揣著相機(jī)與錄音,去河邊找鬼正什。 笑死贤壁,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的埠忘。 我是一名探鬼主播脾拆,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼莹妒!你這毒婦竟也來了名船?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤旨怠,失蹤者是張志新(化名)和其女友劉穎渠驼,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鉴腻,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡迷扇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了爽哎。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蜓席。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖课锌,靈堂內(nèi)的尸體忽然破棺而出厨内,到底是詐尸還是另有隱情,我是刑警寧澤渺贤,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布雏胃,位于F島的核電站,受9級特大地震影響志鞍,放射性物質(zhì)發(fā)生泄漏瞭亮。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一固棚、第九天 我趴在偏房一處隱蔽的房頂上張望统翩。 院中可真熱鬧仙蚜,春花似錦、人聲如沸唆缴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽面徽。三九已至艳丛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間趟紊,已是汗流浹背氮双。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留霎匈,地道東北人戴差。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像铛嘱,于是被迫代替她去往敵國和親暖释。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內(nèi)容