Flink 自定義Avro序列化(Source/Sink)到kafka中


前言

環(huán)境所依賴的pom文件

?<dependencies>
????????<dependency>
????????????<groupId>org.apache.avro</groupId>
????????????<artifactId>avro</artifactId>
????????????<version>1.8.2</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-scala_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala?-->
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-streaming-scala_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.flink/flink-avro?-->
????????<dependency>
????????????<groupId>org.apache.flink</groupId>
????????????<artifactId>flink-avro</artifactId>
????????????<version>1.10.1</version>
????????</dependency>
????????<!--?https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients?-->
????????<dependency>
????????????<groupId>org.apache.kafka</groupId>
????????????<artifactId>kafka-clients</artifactId>
????????????<version>1.0.0</version>
????????</dependency>
????????<dependency>
????????????<groupId>org.apache.kafka</groupId>
????????????<artifactId>kafka-streams</artifactId>
????????????<version>1.0.0</version>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.apache.avro</groupId>
????????????????<artifactId>avro-maven-plugin</artifactId>
????????????????<version>1.8.2</version>
????????????????<executions>
????????????????????<execution>
????????????????????????<phase>generate-sources</phase>
????????????????????????<goals>
????????????????????????????<goal>schema</goal>
????????????????????????</goals>
????????????????????????<configuration>
????????????????????????????<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
????????????????????????????<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
????????????????????????</configuration>
????????????????????</execution>
????????????????</executions>
????????????</plugin>
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-compiler-plugin</artifactId>
????????????????<configuration>
????????????????????<source>1.6</source>
????????????????????<target>1.6</target>
????????????????</configuration>
????????????</plugin>
????????</plugins>
????</build>

一抡驼、Avro提供的技術(shù)支持包括以下五個(gè)方面:

  • 優(yōu)秀的數(shù)據(jù)結(jié)構(gòu)踏揣;
  • 一個(gè)緊湊的,快速的,二進(jìn)制數(shù)據(jù)格式连锯;
  • 一個(gè)容器文件,用來(lái)存儲(chǔ)持久化數(shù)據(jù)贾铝;
  • RPC遠(yuǎn)程過(guò)程調(diào)用汇恤;
  • 集成最簡(jiǎn)單的動(dòng)態(tài)語(yǔ)言。讀取或者寫入數(shù)據(jù)文件访锻,使用或?qū)崿F(xiàn)RPC協(xié)議均不需要代碼實(shí)現(xiàn)褪尝。對(duì)于靜態(tài)- - 語(yǔ)言編寫的話需要實(shí)現(xiàn);

二期犬、Avro優(yōu)點(diǎn)

  • 二進(jìn)制消息河哑,性能好/效率高
  • 使用JSON描述模式
  • 模式和數(shù)據(jù)統(tǒng)一存儲(chǔ),消息自描述龟虎,不需要生成stub代碼(支持生成IDL)
  • RPC調(diào)用在握手階段交換模式定義
  • 包含完整的客戶端/服務(wù)端堆棧璃谨,可快速實(shí)現(xiàn)RPC
  • 支持同步和異步通信
  • 支持動(dòng)態(tài)消息
  • 模式定義允許定義數(shù)據(jù)的排序(序列化時(shí)會(huì)遵循這個(gè)順序)
  • 提供了基于Jetty內(nèi)核的服務(wù)基于Netty的服務(wù)

三、Avro Json格式介紹

{
????"namespace":?"com.avro.bean",
????"type":?"record",
????"name":?"UserBehavior",
????"fields":?[
????????{"name":?"userId",?"type":?"long"},
????????{"name":?"itemId",??"type":?"long"},
????????{"name":?"categoryId",?"type":?"int"},
????????{"name":?"behavior",?"type":?"string"},
????????{"name":?"timestamp",?"type":?"long"}
????]
}
  • namespace : 要生成的目錄
  • type :類型 avro 使用 record
  • name : 會(huì)自動(dòng)生成對(duì)應(yīng)的對(duì)象
  • fields : 要指定的字段

注意: 創(chuàng)建的文件后綴名一定要叫 avsc

四鲤妥、使用Java自定義序列化到kafka

???????? 首先我們先使用 Java編寫Kafka客戶端寫入數(shù)據(jù)和消費(fèi)數(shù)據(jù)佳吞。

4.1 準(zhǔn)備測(cè)試數(shù)據(jù)

543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,4339722,pv,1511658000625915,1162383,570735,pv,1511658000

4.2 自定義Avro 序列化和反序列化

首先我們需要實(shí)現(xiàn)2個(gè)類分別為SerializerDeserializer分別是序列化和反序列化

package?com.avro.AvroUtil;

import?com.avro.bean.UserBehavior;
import?org.apache.avro.io.BinaryDecoder;
import?org.apache.avro.io.BinaryEncoder;
import?org.apache.avro.io.DecoderFactory;
import?org.apache.avro.io.EncoderFactory;
import?org.apache.avro.specific.SpecificDatumReader;
import?org.apache.avro.specific.SpecificDatumWriter;
import?org.apache.kafka.common.serialization.Deserializer;
import?org.apache.kafka.common.serialization.Serializer;

import?java.io.ByteArrayInputStream;
import?java.io.ByteArrayOutputStream;
import?java.io.IOException;
import?java.util.Map;

/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.AvroUtil
?*?@File?:SimpleAvroSchemaJava.java
?*?@date?2021/1/8?20:02?*/
/**
?*??自定義序列化和反序列化?*/
public?class?SimpleAvroSchemaJava?implements?Serializer<UserBehavior>,?Deserializer<UserBehavior>?{
????
????@Override
????public?void?configure(Map<String,??>?map,?boolean?b)?{

????}
????//序列化方法
????@Override
????public?byte[]?serialize(String?s,?UserBehavior?userBehavior)?{
????????//?創(chuàng)建序列化執(zhí)行器
????????SpecificDatumWriter<UserBehavior>?writer?=?new?SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
?????????//?創(chuàng)建一個(gè)流?用存儲(chǔ)序列化后的二進(jìn)制文件
????????ByteArrayOutputStream?out?=?new?ByteArrayOutputStream();
????????//?創(chuàng)建二進(jìn)制編碼器
????????BinaryEncoder?encoder?=?EncoderFactory.get().directBinaryEncoder(out,?null);
????????try?{
????????????//?數(shù)據(jù)入都流中
????????????writer.write(userBehavior,?encoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}

????????return?out.toByteArray();
????}

????@Override
????public?void?close()?{

????}

????//反序列化
????@Override
????public?UserBehavior?deserialize(String?s,?byte[]?bytes)?{
????????//?用來(lái)保存結(jié)果數(shù)據(jù)
????????UserBehavior?userBehavior?=?new?UserBehavior();
????????//?創(chuàng)建輸入流用來(lái)讀取二進(jìn)制文件
????????ByteArrayInputStream?arrayInputStream?=?new?ByteArrayInputStream(bytes);
????????//?創(chuàng)建輸入序列化執(zhí)行器
????????SpecificDatumReader<UserBehavior>?stockSpecificDatumReader?=?new?SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
????????//創(chuàng)建二進(jìn)制解碼器
????????BinaryDecoder?binaryDecoder?=?DecoderFactory.get().directBinaryDecoder(arrayInputStream,?null);
????????try?{
????????????//?數(shù)據(jù)讀取
????????????userBehavior=stockSpecificDatumReader.read(null,?binaryDecoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????//?結(jié)果返回
????????return?userBehavior;
????}
}

4.3 創(chuàng)建序列化對(duì)象

package?com.avro.kafka;
import?com.avro.bean.UserBehavior;
import?org.apache.kafka.clients.producer.KafkaProducer;
import?org.apache.kafka.clients.producer.ProducerRecord;
import?java.io.BufferedReader;
import?java.io.FileReader;
import?java.util.ArrayList;
import?java.util.List;
import?java.util.Properties;

/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.kafka
?*?@File?:UserBehaviorProducerKafka.java
?*?@date?2021/1/8?20:14?*/

public?class?UserBehaviorProducerKafka?{
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//?獲取數(shù)據(jù)
????????List<UserBehavior>?data?=?getData();
????????//?創(chuàng)建配置文件
????????Properties?props?=?new?Properties();
????????props.setProperty("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
????????props.setProperty("key.serializer",?"org.apache.kafka.common.serialization.StringSerializer");
????????props.setProperty("value.serializer",?"com.avro.AvroUtil.SimpleAvroSchemaJava");
????????//?創(chuàng)建kafka的生產(chǎn)者
????????KafkaProducer<String,?UserBehavior>?userBehaviorProducer?=?new?KafkaProducer<String,?UserBehavior>(props);
????????//?循環(huán)遍歷數(shù)據(jù)
????????for?(UserBehavior?userBehavior?:?data)?{
????????????ProducerRecord<String,?UserBehavior>?producerRecord?=?new?ProducerRecord<String,?UserBehavior>("UserBehaviorKafka",?userBehavior);
????????????userBehaviorProducer.send(producerRecord);
????????????System.out.println("數(shù)據(jù)寫入成功"+data);
????????????Thread.sleep(1000);
????????}
????}

????public?static?List<UserBehavior>?getData()?{
????????ArrayList<UserBehavior>?userBehaviors?=?new?ArrayList<UserBehavior>();
????????try?{
????????????BufferedReader?br?=?new?BufferedReader(new?FileReader(new?File("data/UserBehavior.csv")));
????????????String?line?=?"";
????????????while?((line?=?br.readLine())?!=?null)?{
????????????????String[]?split?=?line.split(",");
?????????????userBehaviors.add(?new?UserBehavior(Long.parseLong(split[0]),?Long.parseLong(split[1]),?Integer.parseInt(split[2]),?split[3],?Long.parseLong(split[4])));
????????????}
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????????return?userBehaviors;
????}
}

注意:value.serializer 一定要指定我們自己寫好的那個(gè)反序列化類,負(fù)責(zé)會(huì)無(wú)效

4.4 創(chuàng)建反序列化對(duì)象

package?com.avro.kafka;
import?com.avro.bean.UserBehavior;
import?org.apache.kafka.clients.consumer.ConsumerRecord;
import?org.apache.kafka.clients.consumer.ConsumerRecords;
import?org.apache.kafka.clients.consumer.KafkaConsumer;
import?java.util.Arrays;
import?java.util.Properties;

/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.kafka
?*?@File?:UserBehaviorConsumer.java
?*?@date?2021/1/8?20:58?*/
public?class?UserBehaviorConsumer?{

????public?static?void?main(String[]?args)?{
????????Properties?prop?=?new?Properties();
????????prop.put("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092");
????????prop.put("group.id",?"UserBehavior");
????????prop.put("key.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer");
????????//?設(shè)置反序列化類為自定義的avro反序列化類
????????prop.put("value.deserializer",?"com.avro.AvroUtil.SimpleAvroSchemaJava");
????????KafkaConsumer<String,?UserBehavior>?consumer?=?new?KafkaConsumer<String,?UserBehavior>(prop);
????????consumer.subscribe(Arrays.asList("UserBehaviorKafka"));
????????while?(true)?{
????????????ConsumerRecords<String,?UserBehavior>?poll?=?consumer.poll(1000);
????????????for?(ConsumerRecord<String,?UserBehavior>?stringStockConsumerRecord?:?poll)?{
????????????????System.out.println(stringStockConsumerRecord.value());
????????????}
????????}
????}
}

4.5 啟動(dòng)運(yùn)行

創(chuàng)建kafkaTopic 和啟動(dòng)一個(gè)消費(fèi)者

#?創(chuàng)建topic
./kafka-topics.sh?--create?--zookeeper?node01:2181,node02:2181,node03:2181?--replication-factor?2?--partitions?3?--topic?UserBehaviorKafka
#?模擬消費(fèi)者
./kafka-console-consumer.sh?--from-beginning?--topic?UserBehaviorKafka?--zookeeper?node01:2181,node02:2node03:2181

五棉安、Flink 實(shí)現(xiàn)Avro自定義序列化到Kafka

???????? 到這里好多小伙們就說(shuō)我Java實(shí)現(xiàn)了那Flink 不就改一下Consumer 和Producer 不就完了嗎底扳?

5.1 準(zhǔn)備數(shù)據(jù)

543462,1715,1464116,pv,1511658000662867,2244074,1575622,pv,1511658000561558,3611281,965809,pv,1511658000894923,3076029,1879194,pv,1511658000834377,4541270,3738615,pv,1511658000315321,942195,4339722,pv,1511658000625915,1162383,570735,pv,1511658000

5.2 創(chuàng)建Flink自定義Avro序列化和反序列化


package?com.avro.AvroUtil;

import?com.avro.bean.UserBehavior;
import?com.typesafe.sslconfig.ssl.FakeChainedKeyStore;
import?org.apache.avro.io.BinaryDecoder;
import?org.apache.avro.io.BinaryEncoder;
import?org.apache.avro.io.DecoderFactory;
import?org.apache.avro.io.EncoderFactory;
import?org.apache.avro.specific.SpecificDatumReader;
import?org.apache.avro.specific.SpecificDatumWriter;
import?org.apache.flink.api.common.serialization.DeserializationSchema;
import?org.apache.flink.api.common.serialization.SerializationSchema;
import?org.apache.flink.api.common.typeinfo.TypeInformation;
import?org.apache.kafka.common.serialization.Deserializer;
import?org.apache.kafka.common.serialization.Serializer;

import?java.io.ByteArrayInputStream;
import?java.io.ByteArrayOutputStream;
import?java.io.IOException;
import?java.util.Map;

/**
?*?@author?大數(shù)據(jù)老哥
?*?@version?V1.0
?*?@Package?com.avro.AvroUtil
?*?@File?:SimpleAvroSchemaFlink.java
?*?@date?2021/1/8?20:02?*/

/**
?*??自定義序列化和反序列化?*/
public?class?SimpleAvroSchemaFlink?implements?DeserializationSchema<UserBehavior>,?SerializationSchema<UserBehavior>?{

?
????@Override
????public?byte[]?serialize(UserBehavior?userBehavior)?{
????????//?創(chuàng)建序列化執(zhí)行器
????????SpecificDatumWriter<UserBehavior>?writer?=?new?SpecificDatumWriter<UserBehavior>(userBehavior.getSchema());
????????//?創(chuàng)建一個(gè)流?用存儲(chǔ)序列化后的二進(jìn)制文件
????????ByteArrayOutputStream?out?=?new?ByteArrayOutputStream();
????????//?創(chuàng)建二進(jìn)制編碼器
????????BinaryEncoder?encoder?=?EncoderFactory.get().directBinaryEncoder(out,?null);
????????try?{
????????????//?數(shù)據(jù)入都流中
????????????writer.write(userBehavior,?encoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}

????????return?out.toByteArray();
????}

????@Override
????public?TypeInformation<UserBehavior>?getProducedType()?{
??????return?TypeInformation.of(UserBehavior.class);
????}

????@Override
????public?UserBehavior?deserialize(byte[]?bytes)?throws?IOException?{
????????//?用來(lái)保存結(jié)果數(shù)據(jù)
????????UserBehavior?userBehavior?=?new?UserBehavior();
????????//?創(chuàng)建輸入流用來(lái)讀取二進(jìn)制文件
????????ByteArrayInputStream?arrayInputStream?=?new?ByteArrayInputStream(bytes);
????????//?創(chuàng)建輸入序列化執(zhí)行器
????????SpecificDatumReader<UserBehavior>?stockSpecificDatumReader?=?new?SpecificDatumReader<UserBehavior>(userBehavior.getSchema());
????????//創(chuàng)建二進(jìn)制解碼器
????????BinaryDecoder?binaryDecoder?=?DecoderFactory.get().directBinaryDecoder(arrayInputStream,?null);
????????try?{
????????????//?數(shù)據(jù)讀取
????????????userBehavior=stockSpecificDatumReader.read(null,?binaryDecoder);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????//?結(jié)果返回
????????return?userBehavior;
????}

????@Override
????public?boolean?isEndOfStream(UserBehavior?userBehavior)?{
????????return?false;
????}
}

5.3 創(chuàng)建Flink Comsumer 反序列化

package?com.avro.FlinkKafka

import?com.avro.AvroUtil.{SimpleAvroSchemaFlink}
import?com.avro.bean.UserBehavior
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import?java.util.Properties

/**
?*?@Package?com.avro.FlinkKafka
?*?@File :UserBehaviorConsumerFlink.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2021/1/8?21:18
?*?@version?V1.0?*/
object?UserBehaviorConsumerFlink?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構(gòu)建流處理運(yùn)行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設(shè)置并行度1?方便后面測(cè)試
????//?2.設(shè)置kafka?配置信息
????val?prop?=?new?Properties
????prop.put("bootstrap.servers",?"192.168.100.201:9092,192.168.100.202:9092,192.168.100.203:9092")
????prop.put("group.id",?"UserBehavior")
????prop.put("key.deserializer",?"org.apache.kafka.common.serialization.StringDeserializer")
????//?設(shè)置反序列化類為自定義的avro反序列化類
????prop.put("value.deserializer",?"com.avro.AvroUtil.SimpleAvroSchemaFlink")

????//????val?kafka:?FlinkKafkaConsumer011[String]?=??new?FlinkKafkaConsumer011[String]("UserBehaviorKafka",?new?SimpleStringSchema(),?prop)
????//?3.構(gòu)建Kafka?連接器
????val?kafka:?FlinkKafkaConsumer011[UserBehavior]?=?new?FlinkKafkaConsumer011[UserBehavior]("UserBehavior",?new?SimpleAvroSchemaFlink(),?prop)

????//4.設(shè)置Flink層最新的數(shù)據(jù)開始消費(fèi)
????kafka.setStartFromLatest()
????//5.基于kafka構(gòu)建數(shù)據(jù)源
????val?data:?DataStream[UserBehavior]?=?env.addSource(kafka)
????//6.結(jié)果打印
????data.print()
????env.execute("UserBehaviorConsumerFlink")
??}
}

5.4 創(chuàng)建Flink Producer 序列化

package?com.avro.FlinkKafka

import?com.avro.AvroUtil.SimpleAvroSchemaFlink
import?com.avro.bean.UserBehavior
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011

import?java.util.Properties

/**
?*?@Package?com.avro.FlinkKafka
?*?@File :UserBehaviorProducerFlink.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2021/1/8?21:38
?*?@version?V1.0?*/
object?UserBehaviorProducerFlink?{
??def?main(args:?Array[String]):?Unit?=?{
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????val?value?=?env.readTextFile("./data/UserBehavior.csv")
????val?users:?DataStream[UserBehavior]?=?value.map(row?=>?{
??????val?arr?=?row.split(",")
??????val?behavior?=?new?UserBehavior()
??????behavior.setUserId(arr(0).toLong)
??????behavior.setItemId(arr(1).toLong)
??????behavior.setCategoryId(arr(2).toInt)
??????behavior.setBehavior(arr(3))
??????behavior.setTimestamp(arr(4).toLong)
??????behavior
????})
????val?prop?=?new?Properties()
????prop.setProperty("bootstrap.servers",?"node01:9092,node02:9092,node03:9092")
????//4.連接Kafka
????val?producer:?FlinkKafkaProducer011[UserBehavior]?=?new?FlinkKafkaProducer011[UserBehavior]("UserBehaviorKafka",?new?SimpleAvroSchemaFlink(),?prop)
????//5.將數(shù)據(jù)打入kafka
????users.addSink(producer)
????//6.執(zhí)行任務(wù)
????env.execute("UserBehaviorProducerFlink")
??}
}

5.5 啟動(dòng)運(yùn)行

需要源碼的請(qǐng)去GitHub 自行下載 ?https://github.com/lhh2002/Flink_Avro

小結(jié)

???????? ?其實(shí)我在實(shí)現(xiàn)這個(gè)功能的時(shí)候也是蒙的,不會(huì)難道就不學(xué)了嗎垂券,肯定不是呀花盐。我在5.2提出的那個(gè)問(wèn)題的時(shí)候其實(shí)是我自己親身經(jīng)歷過(guò)的。首先遇到了問(wèn)題不要想著怎么放棄菇爪,而是想想怎么解決算芯,當(dāng)時(shí)我的思路看源碼看別人寫的。最后經(jīng)過(guò)不懈的努力也終成功了凳宙,我在這里為大家提供Flink面試題需要的朋友可以去下面GitHub去下載熙揍,信自己,努力和汗水總會(huì)能得到回報(bào)的氏涩。我是大數(shù)據(jù)老哥届囚,我們下期見~~~

資源獲取 獲取Flink面試題有梆,Spark面試題,程序員必備軟件意系,hive面試題泥耀,Hadoop面試題,Docker面試題蛔添,簡(jiǎn)歷模板等資源請(qǐng)去

GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData

Gitee 自行下載?https://gitee.com/li_hey_hey/Framework-Of-BigData

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末痰催,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子迎瞧,更是在濱河造成了極大的恐慌夸溶,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,183評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凶硅,死亡現(xiàn)場(chǎng)離奇詭異缝裁,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)足绅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門捷绑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人氢妈,你說(shuō)我怎么就攤上這事胎食。” “怎么了允懂?”我有些...
    開封第一講書人閱讀 168,766評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)衩匣。 經(jīng)常有香客問(wèn)我蕾总,道長(zhǎng),這世上最難降的妖魔是什么琅捏? 我笑而不...
    開封第一講書人閱讀 59,854評(píng)論 1 299
  • 正文 為了忘掉前任生百,我火速辦了婚禮,結(jié)果婚禮上柄延,老公的妹妹穿的比我還像新娘蚀浆。我一直安慰自己,他們只是感情好搜吧,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評(píng)論 6 398
  • 文/花漫 我一把揭開白布市俊。 她就那樣靜靜地躺著,像睡著了一般滤奈。 火紅的嫁衣襯著肌膚如雪摆昧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,457評(píng)論 1 311
  • 那天蜒程,我揣著相機(jī)與錄音绅你,去河邊找鬼伺帘。 笑死,一個(gè)胖子當(dāng)著我的面吹牛忌锯,可吹牛的內(nèi)容都是我干的伪嫁。 我是一名探鬼主播,決...
    沈念sama閱讀 40,999評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼偶垮,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼张咳!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起针史,我...
    開封第一講書人閱讀 39,914評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤晶伦,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后啄枕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體婚陪,經(jīng)...
    沈念sama閱讀 46,465評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評(píng)論 3 342
  • 正文 我和宋清朗相戀三年频祝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了泌参。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,675評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡常空,死狀恐怖沽一,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情漓糙,我是刑警寧澤铣缠,帶...
    沈念sama閱讀 36,354評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站昆禽,受9級(jí)特大地震影響蝗蛙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜醉鳖,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評(píng)論 3 335
  • 文/蒙蒙 一捡硅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧盗棵,春花似錦壮韭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至瞭恰,卻和暖如春逼蒙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工是牢, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留僵井,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,091評(píng)論 3 378
  • 正文 我出身青樓驳棱,卻偏偏與公主長(zhǎng)得像批什,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子社搅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評(píng)論 2 360

推薦閱讀更多精彩內(nèi)容