Flink 源碼之安全認(rèn)證

Flink源碼分析系列文檔目錄

請(qǐng)點(diǎn)擊:Flink 源碼分析系列文檔目錄

前言

本篇分析下Flink安全認(rèn)證部分的處理方式樊销。主要為Kerberos認(rèn)證相關(guān)內(nèi)容意荤。下面從配置項(xiàng)開始分析冠骄。

SecurityConfiguration

此類包含了Flink安全認(rèn)證相關(guān)的配置項(xiàng)偶摔。它們的含義如下:

zookeeper.sasl.disable:是否啟用Zookeeper SASL耳标。

security.kerberos.login.keytab:Kerberos認(rèn)證keytab文件的路徑忌锯。

security.kerberos.login.principal:Kerberos認(rèn)證principal垄提。

security.kerberos.login.use-ticket-cache:Kerberos認(rèn)證是否使用票據(jù)緩存枫慷。

security.kerberos.login.contexts:Kerberos登錄上下文名稱,等效于JAAS文件的entry name涡戳。

zookeeper.sasl.service-name:Zookeeper SASL服務(wù)名结蟋。默認(rèn)為zookeeper

zookeeper.sasl.login-context-name:Zookeeper SASL登陸上下文名稱渔彰。默認(rèn)為Client嵌屎。

security.context.factory.classes:包含哪些 SecurityContextFactory。默認(rèn)值為:

  • org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
  • org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory

security.module.factory.classes:包含哪些SecurityModuleFactory恍涂。 默認(rèn)值為:

  • org.apache.flink.runtime.security.modules.HadoopModuleFactory
  • org.apache.flink.runtime.security.modules.JaasModuleFactory
  • org.apache.flink.runtime.security.modules.ZookeeperModuleFactory

SecurityUtils

SecurityUtils.install方法是提交Flink任務(wù)安全認(rèn)證的入口方法宝惰,用于安裝安全配置。它的代碼如下所示:

public static void install(SecurityConfiguration config) throws Exception {
    // Install the security modules first before installing the security context
    // 安裝安全模塊
    installModules(config);
    // 安裝安全上下文
    installContext(config);
}

installModules方法用于安裝安全認(rèn)證模塊再沧。安全認(rèn)證模塊的內(nèi)容在后面分析掌测。

static void installModules(SecurityConfiguration config) throws Exception {

    // install the security module factories
    List<SecurityModule> modules = new ArrayList<>();
    // 遍歷所有SecurityModuleFactory的配置
    for (String moduleFactoryClass : config.getSecurityModuleFactories()) {
        SecurityModuleFactory moduleFactory = null;
        try {
            // 使用ServiceLoader加載ModuleFactory
            moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
        } catch (NoMatchSecurityFactoryException ne) {
            LOG.error("Unable to instantiate security module factory {}", moduleFactoryClass);
            throw new IllegalArgumentException("Unable to find module factory class", ne);
        }
        // 使用factory創(chuàng)建出SecurityModule
        SecurityModule module = moduleFactory.createModule(config);
        // can be null if a SecurityModule is not supported in the current environment
        // 安裝module
        // 添加module到modules集合
        if (module != null) {
            module.install();
            modules.add(module);
        }
    }
    installedModules = modules;
}

installContext方法用于安裝安全上下文環(huán)境,它的用途同樣在后面章節(jié)介紹产园。

static void installContext(SecurityConfiguration config) throws Exception {
    // install the security context factory
    // 遍歷SecurityContextFactories
    // 配置項(xiàng)名稱為security.context.factory.classes
    for (String contextFactoryClass : config.getSecurityContextFactories()) {
        try {
            // 使用ServiceLoader汞斧,加載SecurityContextFactory
            SecurityContextFactory contextFactory =
                SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
            // 檢查SecurityContextFactory是否和配置文件兼容(1)
            if (contextFactory.isCompatibleWith(config)) {
                try {
                    // 創(chuàng)建出第一個(gè)兼容的SecurityContext
                    installedContext = contextFactory.createContext(config);
                    // install the first context that's compatible and ignore the remaining.
                    break;
                } catch (SecurityContextInitializeException e) {
                    LOG.error(
                        "Cannot instantiate security context with: " + contextFactoryClass,
                        e);
                } catch (LinkageError le) {
                    LOG.error(
                        "Error occur when instantiate security context with: "
                        + contextFactoryClass,
                        le);
                }
            } else {
                LOG.debug("Unable to install security context factory {}", contextFactoryClass);
            }
        } catch (NoMatchSecurityFactoryException ne) {
            LOG.warn("Unable to instantiate security context factory {}", contextFactoryClass);
        }
    }
    if (installedContext == null) {
        LOG.error("Unable to install a valid security context factory!");
        throw new Exception("Unable to install a valid security context factory!");
    }
}

數(shù)字標(biāo)注內(nèi)容解析:

  1. 這里分析下isCompatibleWith方法邏輯,SecurityContextFactory具有HadoopSecurityContextFactoryNoOpSecurityContextFactory兩個(gè)實(shí)現(xiàn)類什燕。其中HadoopSecurityContextFactory要求security.module.factory.classes配置項(xiàng)包含org.apache.flink.runtime.security.modules.HadoopModuleFactory粘勒,并且要求org.apache.hadoop.security.UserGroupInformation在classpath中。NoOpSecurityContextFactory無任何要求屎即。

SecurityModule

SecurityModule分別為不同類型服務(wù)提供安全認(rèn)證功能庙睡,包含3個(gè)子類:

  • HadoopModule:使用UserGroupInformation方式認(rèn)證事富。
  • JaasModule:負(fù)責(zé)安裝JAAS配置,在進(jìn)程范圍內(nèi)生效乘陪。
  • ZookeeperModule:提供Zookeeper安全配置统台。

HadoopModule

HadoopModule包含了Flink的SecurityConfiguration和Hadoop的配置信息(從Hadoop配置文件讀取,讀取邏輯在HadoopUtils.getHadoopConfiguration啡邑,后面分析)贱勃。

install方法

install方法使用Hadoop提供的UserGroupInformation進(jìn)行認(rèn)證操作。

@Override
public void install() throws SecurityInstallException {

    // UGI設(shè)置hadoop conf
    UserGroupInformation.setConfiguration(hadoopConfiguration);

    UserGroupInformation loginUser;

    try {
        // 如果Hadoop啟用了安全配置
        if (UserGroupInformation.isSecurityEnabled()
            && !StringUtils.isBlank(securityConfig.getKeytab())
            && !StringUtils.isBlank(securityConfig.getPrincipal())) {
            // 獲取keytab路徑
            String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();

            // 使用UGI認(rèn)證Flink conf中配置的keytab和principal
            UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);

            // 獲取認(rèn)證的用戶
            loginUser = UserGroupInformation.getLoginUser();

            // supplement with any available tokens
            // 從HADOOP_TOKEN_FILE_LOCATION讀取token緩存文件
            String fileLocation =
                System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
            // 如果有本地token緩存
            if (fileLocation != null) {
                Credentials credentialsFromTokenStorageFile =
                    Credentials.readTokenStorageFile(
                    new File(fileLocation), hadoopConfiguration);

                // if UGI uses Kerberos keytabs for login, do not load HDFS delegation token
                // since
                // the UGI would prefer the delegation token instead, which eventually expires
                // and does not fallback to using Kerberos tickets
                // 如果UGI使用keytab方式登錄谤逼,不用加載HDFS的delegation token
                // 因?yàn)閁GI傾向于使用delegation token贵扰,這些token最終會(huì)失效,不會(huì)使用kerberos票據(jù)
                Credentials credentialsToBeAdded = new Credentials();
                final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
                Collection<Token<? extends TokenIdentifier>> usrTok =
                    credentialsFromTokenStorageFile.getAllTokens();
                // If UGI use keytab for login, do not load HDFS delegation token.
                // 遍歷token存儲(chǔ)文件中的token
                // 將所有的非delegation token添加到憑據(jù)中
                for (Token<? extends TokenIdentifier> token : usrTok) {
                    if (!token.getKind().equals(hdfsDelegationTokenKind)) {
                        final Text id = new Text(token.getIdentifier());
                        credentialsToBeAdded.addToken(id, token);
                    }
                }

                // 為loginUser添加憑據(jù)
                loginUser.addCredentials(credentialsToBeAdded);
            }
        } else {
            // 如果沒有啟動(dòng)安全配置
            // 從當(dāng)前用戶憑據(jù)認(rèn)證
            // login with current user credentials (e.g. ticket cache, OS login)
            // note that the stored tokens are read automatically
            try {
                // Use reflection API to get the login user object
                // 反射調(diào)用如下方法
                // UserGroupInformation.loginUserFromSubject(null);
                Method loginUserFromSubjectMethod =
                    UserGroupInformation.class.getMethod(
                        "loginUserFromSubject", Subject.class);
                loginUserFromSubjectMethod.invoke(null, (Subject) null);
            } catch (NoSuchMethodException e) {
                LOG.warn("Could not find method implementations in the shaded jar.", e);
            } catch (InvocationTargetException e) {
                throw e.getTargetException();
            }

            // 獲取當(dāng)前登錄用戶
            loginUser = UserGroupInformation.getLoginUser();
        }

        LOG.info("Hadoop user set to {}", loginUser);

        if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) {
            boolean isCredentialsConfigured =
                HadoopUtils.areKerberosCredentialsValid(
                loginUser, securityConfig.useTicketCache());

            LOG.info(
                "Kerberos security is enabled and credentials are {}.",
                isCredentialsConfigured ? "valid" : "invalid");
        }
    } catch (Throwable ex) {
        throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
    }
}

HadoopUtils.getHadoopConfiguration 方法

這個(gè)方法為讀取Hadoop配置文件的邏輯流部,較為復(fù)雜戚绕,接下來詳細(xì)分析下。

public static Configuration getHadoopConfiguration(
    org.apache.flink.configuration.Configuration flinkConfiguration) {

    // Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
    // from the classpath

    // 創(chuàng)建個(gè)空的conf
    Configuration result = new HdfsConfiguration();
    // 標(biāo)記是否找到hadoop配置文件
    boolean foundHadoopConfiguration = false;

    // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
    // the hdfs configuration.
    // The properties of a newly added resource will override the ones in previous resources, so
    // a configuration
    // file with higher priority should be added later.

    // Approach 1: HADOOP_HOME environment variables
    // 保存兩個(gè)可能的hadoop conf路徑
    String[] possibleHadoopConfPaths = new String[2];

    // 獲取HADOOP_HOME環(huán)境變量的值
    final String hadoopHome = System.getenv("HADOOP_HOME");
    if (hadoopHome != null) {
        LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
        // 如果發(fā)現(xiàn)HADOOP_HOME環(huán)境變量的值
        // 嘗試分別從如下路徑獲戎健:
        // $HADOOP_HOME/conf
        // $HADOOP_HOME/etc/hadoop
        possibleHadoopConfPaths[0] = hadoopHome + "/conf";
        possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
    }

    for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
        if (possibleHadoopConfPath != null) {
            // 依次嘗試讀取possibleHadoopConfPath下的core-site.xml文件和hdfs-site.xml文件到hadoop conf中
            foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
        }
    }

    // Approach 2: Flink configuration (deprecated)
    // 獲取Flink配置項(xiàng) fs.hdfs.hdfsdefault 對(duì)應(yīng)的配置文件舞丛,加入hadoop conf
    final String hdfsDefaultPath =
        flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
    if (hdfsDefaultPath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
        LOG.debug(
            "Using hdfs-default configuration-file path from Flink config: {}",
            hdfsDefaultPath);
        foundHadoopConfiguration = true;
    }
    
    // 獲取Flink配置項(xiàng) fs.hdfs.hadoopconf 對(duì)應(yīng)的配置文件,加入hadoop conf
    final String hdfsSitePath =
        flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
    if (hdfsSitePath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
        LOG.debug(
            "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
        foundHadoopConfiguration = true;
    }

    // 獲取Flink配置項(xiàng) fs.hdfs.hadoopconf 對(duì)應(yīng)的配置文件果漾,加入hadoop conf
    final String hadoopConfigPath =
        flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
    if (hadoopConfigPath != null) {
        LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
        foundHadoopConfiguration =
            addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
    }

    // Approach 3: HADOOP_CONF_DIR environment variable
    // 從系統(tǒng)環(huán)境變量HADOOP_CONF_DIR目錄中讀取hadoop配置文件
    String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
    if (hadoopConfDir != null) {
        LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
        foundHadoopConfiguration =
            addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
    }

    // Approach 4: Flink configuration
    // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
    // 讀取Flink配置文件中所有以flink.hadoop.為前綴的key
    // 將這些key截掉這個(gè)前綴作為新的key球切,和原先的value一起作為hadoop conf的配置項(xiàng),存放入hadoop conf
    for (String key : flinkConfiguration.keySet()) {
        for (String prefix : FLINK_CONFIG_PREFIXES) {
            if (key.startsWith(prefix)) {
                String newKey = key.substring(prefix.length());
                String value = flinkConfiguration.getString(key, null);
                result.set(newKey, value);
                LOG.debug(
                    "Adding Flink config entry for {} as {}={} to Hadoop config",
                    key,
                    newKey,
                    value);
                foundHadoopConfiguration = true;
            }
        }
    }

    // 如果以上途徑均未找到hadoop conf跨晴,顯示告警信息
    if (!foundHadoopConfiguration) {
        LOG.warn(
            "Could not find Hadoop configuration via any of the supported methods "
            + "(Flink configuration, environment variables).");
    }

    return result;
}

我們總結(jié)下Flink讀取Hadoop配置文件的完整邏輯欧聘,從上到下為讀取順序:

  • 讀取HADOOP_HOME環(huán)境變量片林,如果存在端盆,分別從它的confetc/hadoop目錄下讀取core-site.xmlhdfs-site.xml文件。
  • 從Flink配置文件的fs.hdfs.hdfsdefault配置項(xiàng)所在目錄下尋找费封。
  • 從Flink配置文件的fs.hdfs.hadoopconf配置項(xiàng)所在目錄下尋找焕妙。
  • HADOOP_CONF_DIR環(huán)境變量對(duì)應(yīng)的目錄下尋找。
  • 讀取Flink配置文件中所有以flink.hadoop.為前綴的key弓摘,將這些key截掉這個(gè)前綴作為新的key焚鹊,和原先的value一起作為hadoop conf的配置項(xiàng),存放入hadoop conf韧献。

JaasModule

install方法讀取了java.security.auth.login.config系統(tǒng)變量對(duì)應(yīng)的jaas配置末患,并且將Flink配置文件中相關(guān)配置轉(zhuǎn)換為JAAS中的entry,合并到系統(tǒng)變量對(duì)應(yīng)的jaas配置中并設(shè)置給JVM锤窑。代碼如下所示:

@Override
public void install() {

    // ensure that a config file is always defined, for compatibility with
    // ZK and Kafka which check for the system property and existence of the file
    // 讀取java.security.auth.login.config系統(tǒng)變量值璧针,用于在卸載module的時(shí)候恢復(fù)
    priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
    // 如果沒有配置
    if (priorConfigFile == null) {
        // Flink的 io.tmp.dirs配置項(xiàng)第一個(gè)目錄為workingDir
        // 將默認(rèn)的flink-jaas.conf文件寫入這個(gè)位置,創(chuàng)建臨時(shí)文件渊啰,名為jass-xxx.conf
        // 在JVM進(jìn)程關(guān)閉的時(shí)候刪除這個(gè)臨時(shí)文件
        File configFile = generateDefaultConfigFile(workingDir);
        // 配置java.security.auth.login.config系統(tǒng)變量值
        // 保證這個(gè)系統(tǒng)變量的值始終存在探橱,這是為了兼容Zookeeper和Kafka
        // 他們會(huì)去檢查這個(gè)jaas文件是否存在
        System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
        LOG.info("Jaas file will be created as {}.", configFile);
    }

    // read the JAAS configuration file
    // 讀取已安裝的jaas配置文件
    priorConfig = javax.security.auth.login.Configuration.getConfiguration();

    // construct a dynamic JAAS configuration
    // 包裝為DynamicConfiguration申屹,這個(gè)配置是可以修改的
    currentConfig = new DynamicConfiguration(priorConfig);

    // wire up the configured JAAS login contexts to use the krb5 entries
    // 從Flink配置文件中讀取kerberos配置
    // AppConfigurationEntry為Java讀取Jaas配置文件中一段配置項(xiàng)的封裝
    // 一段配置項(xiàng)指的是大括號(hào)之內(nèi)的配置
    AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
    if (krb5Entries != null) {
        // 遍歷Flink配置項(xiàng)security.kerberos.login.contexts,作為entry name使用
        for (String app : securityConfig.getLoginContextNames()) {
            // 將krb5Entries對(duì)應(yīng)的AppConfigurationEntry添加入currrentConfig
            // 使用security.kerberos.login.contexts對(duì)應(yīng)的entry name
            currentConfig.addAppConfigurationEntry(app, krb5Entries);
        }
    }

    // 設(shè)置新的currentConfig
    javax.security.auth.login.Configuration.setConfiguration(currentConfig);
}

上面代碼中getAppConfigurationEntries方法邏輯較為復(fù)雜隧膏,下面給出它的解析哗讥。

getAppConfigurationEntries方法從Flink的securityConfig中讀取配置,轉(zhuǎn)換為JAAS entry的格式胞枕,存入AppConfigurationEntry杆煞。如果Flink配置了security.kerberos.login.use-ticket-cache,加載類似如下內(nèi)容的文件曲稼,生成一個(gè)AppConfigurationEntry叫做userKerberosAce

EntryName {
    com.sun.security.auth.module.Krb5LoginModule optional
    doNotPrompt=true
    useTicketCache=true
    renewTGT=true;
};

如果Flink中配置了security.kerberos.login.keytab索绪,會(huì)加載如下配置,生成一個(gè)AppConfigurationEntry叫做keytabKerberosAce

EntryName {
    com.sun.security.auth.module.Krb5LoginModule required
    keyTab=keytab路徑
    doNotPrompt=true
    useKeyTab=true
    storeKey=true
    principal=principal名稱
    refreshKrb5Config=true;
};

getAppConfigurationEntries最后返回這兩個(gè)AppConfigurationEntry的集合贫悄,如果某一個(gè)為null瑞驱,只返回其中一個(gè)。

ZookeeperModule

install方法窄坦,主要作用為根據(jù)Flink配置唤反,設(shè)置Zookeeper相關(guān)的幾個(gè)系統(tǒng)變量的值。

@Override
public void install() throws SecurityInstallException {

    // 獲取zookeeper.sasl.client系統(tǒng)變量值鸭津,用于在卸載module的時(shí)候恢復(fù)
    priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
    // 讀取Flink配置項(xiàng)zookeeper.sasl.disable的值彤侍,根據(jù)其語義(取反)設(shè)置為zookeeper.sasl.client系統(tǒng)變量
    System.setProperty(
        ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));

    // 獲取zookeeper.sasl.client.username系統(tǒng)變量值,用于在卸載module的時(shí)候恢復(fù)
    priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
    // 讀取Flink配置項(xiàng)zookeeper.sasl.service-name
    // 如果不為默認(rèn)值zookeeper逆趋,設(shè)置zookeeper.sasl.client.username系統(tǒng)變量
    if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) {
        System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
    }

    // 獲取zookeeper.sasl.clientconfig系統(tǒng)變量值盏阶,用于在卸載module的時(shí)候恢復(fù)
    priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
    // 讀取Flink配置項(xiàng)zookeeper.sasl.login-context-name
    // 如果不為默認(rèn)值Client,設(shè)置zookeeper.sasl.clientconfig系統(tǒng)變量
    if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) {
        System.setProperty(
            ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
    }
}

SecurityContext

顧名思義為安全環(huán)境上下文闻书,用于在不同認(rèn)證環(huán)境下執(zhí)行需要授權(quán)才能調(diào)用的邏輯名斟。

HadoopSecurityContext

HadoopSecurityContext用于在認(rèn)證過的UserGroupInformation中執(zhí)行邏輯(封裝在Callable中)。

public class HadoopSecurityContext implements SecurityContext {

    private final UserGroupInformation ugi;

    public HadoopSecurityContext(UserGroupInformation ugi) {
        this.ugi = Preconditions.checkNotNull(ugi, "UGI passed cannot be null");
    }

    public <T> T runSecured(final Callable<T> securedCallable) throws Exception {
        return ugi.doAs((PrivilegedExceptionAction<T>) securedCallable::call);
    }
}

NoOpSecurityContext

NoOpSecurityContext不做任何認(rèn)證魄眉,直接運(yùn)行Callable砰盐。

public class NoOpSecurityContext implements SecurityContext {

    @Override
    public <T> T runSecured(Callable<T> securedCallable) throws Exception {
        return securedCallable.call();
    }
}

本博客為作者原創(chuàng),歡迎大家參與討論和批評(píng)指正坑律。如需轉(zhuǎn)載請(qǐng)注明出處岩梳。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市晃择,隨后出現(xiàn)的幾起案子冀值,更是在濱河造成了極大的恐慌,老刑警劉巖宫屠,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件列疗,死亡現(xiàn)場離奇詭異,居然都是意外死亡激况,警方通過查閱死者的電腦和手機(jī)作彤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門膘魄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人竭讳,你說我怎么就攤上這事创葡。” “怎么了绢慢?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵灿渴,是天一觀的道長。 經(jīng)常有香客問我胰舆,道長骚露,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任缚窿,我火速辦了婚禮棘幸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘倦零。我一直安慰自己误续,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布扫茅。 她就那樣靜靜地躺著蹋嵌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪葫隙。 梳的紋絲不亂的頭發(fā)上栽烂,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天,我揣著相機(jī)與錄音恋脚,去河邊找鬼腺办。 笑死,一個(gè)胖子當(dāng)著我的面吹牛慧起,可吹牛的內(nèi)容都是我干的菇晃。 我是一名探鬼主播册倒,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼蚓挤,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了驻子?” 一聲冷哼從身側(cè)響起灿意,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎崇呵,沒想到半個(gè)月后缤剧,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡域慷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年荒辕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了汗销。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡抵窒,死狀恐怖弛针,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情李皇,我是刑警寧澤削茁,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站掉房,受9級(jí)特大地震影響茧跋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜卓囚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一瘾杭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧哪亿,春花似錦富寿、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至银萍,卻和暖如春变勇,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背贴唇。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來泰國打工搀绣, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人戳气。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓链患,卻偏偏與公主長得像,于是被迫代替她去往敵國和親瓶您。 傳聞我的和親對(duì)象是個(gè)殘疾皇子麻捻,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容