聊聊flink的SocketClientSink

本文主要研究一下flink的SocketClientSink

DataStream.writeToSocket

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

    /**
     * Writes the DataStream to a socket as a byte array. The format of the
     * output is specified by a {@link SerializationSchema}.
     *
     * @param hostName
     *            host of the socket
     * @param port
     *            port of the socket
     * @param schema
     *            schema for serialization
     * @return the closed DataStream
     */
    @PublicEvolving
    public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
        DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0));
        returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
        return returnStream;
    }
  • DataStream的writeToSocket方法攻锰,內(nèi)部創(chuàng)建了SocketClientSink送讲,這里傳遞了四個構(gòu)造參數(shù),分別是hostName黎炉、port吏奸、schema憨愉、maxNumRetries(這里為0)

SocketClientSink

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java

/**
 * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
 *
 * <p>The sink can be set to retry message sends after the sending failed.
 *
 * <p>The sink can be set to 'autoflush', in which case the socket stream is flushed after every
 * message. This significantly reduced throughput, but also decreases message latency.
 *
 * @param <IN> data to be written into the Socket.
 */
@PublicEvolving
public class SocketClientSink<IN> extends RichSinkFunction<IN> {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);

    private static final int CONNECTION_RETRY_DELAY = 500;


    private final SerializableObject lock = new SerializableObject();
    private final SerializationSchema<IN> schema;
    private final String hostName;
    private final int port;
    private final int maxNumRetries;
    private final boolean autoFlush;

    private transient Socket client;
    private transient OutputStream outputStream;

    private int retries;

    private volatile boolean isRunning = true;

    /**
     * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
     * and will not auto-flush the stream.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) {
        this(hostName, port, schema, 0);
    }

    /**
     * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
     * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
     * The sink will not auto-flush the stream.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     * @param maxNumRetries The maximum number of retries after a message send failed.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) {
        this(hostName, port, schema, maxNumRetries, false);
    }

    /**
     * Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
     * A value of -1 for the number of retries will cause the system to retry an infinite number of times.
     *
     * @param hostName Hostname of the server to connect to.
     * @param port Port of the server.
     * @param schema Schema used to serialize the data into bytes.
     * @param maxNumRetries The maximum number of retries after a message send failed.
     * @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
     */
    public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
                            int maxNumRetries, boolean autoflush) {
        checkArgument(port > 0 && port < 65536, "port is out of range");
        checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");

        this.hostName = checkNotNull(hostName, "hostname must not be null");
        this.port = port;
        this.schema = checkNotNull(schema);
        this.maxNumRetries = maxNumRetries;
        this.autoFlush = autoflush;
    }

    // ------------------------------------------------------------------------
    //  Life cycle
    // ------------------------------------------------------------------------

    /**
     * Initialize the connection with the Socket in the server.
     * @param parameters Configuration.
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        try {
            synchronized (lock) {
                createConnection();
            }
        }
        catch (IOException e) {
            throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
        }
    }


    /**
     * Called when new data arrives to the sink, and forwards it to Socket.
     *
     * @param value The value to write to the socket.
     */
    @Override
    public void invoke(IN value) throws Exception {
        byte[] msg = schema.serialize(value);

        try {
            outputStream.write(msg);
            if (autoFlush) {
                outputStream.flush();
            }
        }
        catch (IOException e) {
            // if no re-tries are enable, fail immediately
            if (maxNumRetries == 0) {
                throw new IOException("Failed to send message '" + value + "' to socket server at "
                        + hostName + ":" + port + ". Connection re-tries are not enabled.", e);
            }

            LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
                    ". Trying to reconnect..." , e);

            // do the retries in locked scope, to guard against concurrent close() calls
            // note that the first re-try comes immediately, without a wait!

            synchronized (lock) {
                IOException lastException = null;
                retries = 0;

                while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {

                    // first, clean up the old resources
                    try {
                        if (outputStream != null) {
                            outputStream.close();
                        }
                    }
                    catch (IOException ee) {
                        LOG.error("Could not close output stream from failed write attempt", ee);
                    }
                    try {
                        if (client != null) {
                            client.close();
                        }
                    }
                    catch (IOException ee) {
                        LOG.error("Could not close socket from failed write attempt", ee);
                    }

                    // try again
                    retries++;

                    try {
                        // initialize a new connection
                        createConnection();

                        // re-try the write
                        outputStream.write(msg);

                        // success!
                        return;
                    }
                    catch (IOException ee) {
                        lastException = ee;
                        LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
                    }

                    // wait before re-attempting to connect
                    lock.wait(CONNECTION_RETRY_DELAY);
                }

                // throw an exception if the task is still running, otherwise simply leave the method
                if (isRunning) {
                    throw new IOException("Failed to send message '" + value + "' to socket server at "
                            + hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
                }
            }
        }
    }

    /**
     * Closes the connection with the Socket server.
     */
    @Override
    public void close() throws Exception {
        // flag this as not running any more
        isRunning = false;

        // clean up in locked scope, so there is no concurrent change to the stream and client
        synchronized (lock) {
            // we notify first (this statement cannot fail). The notified thread will not continue
            // anyways before it can re-acquire the lock
            lock.notifyAll();

            try {
                if (outputStream != null) {
                    outputStream.close();
                }
            }
            finally {
                if (client != null) {
                    client.close();
                }
            }
        }
    }

    // ------------------------------------------------------------------------
    //  Utilities
    // ------------------------------------------------------------------------

    private void createConnection() throws IOException {
        client = new Socket(hostName, port);
        client.setKeepAlive(true);
        client.setTcpNoDelay(true);

        outputStream = client.getOutputStream();
    }

    // ------------------------------------------------------------------------
    //  For testing
    // ------------------------------------------------------------------------

    int getCurrentNumberOfRetries() {
        synchronized (lock) {
            return retries;
        }
    }
}
  • SocketClientSink繼承了RichSinkFunction岂座,其autoFlush屬性默認(rèn)為false
  • open方法里頭調(diào)用了createConnection陶因,來初始化與socket的連接,如果此時出現(xiàn)IOException巷嚣,則立馬fail fast喘先;createConnection的時候,這里設(shè)置的keepAlive及tcpNoDelay均為true
  • invoke方法首先調(diào)用schema.serialize方法來序列化value涂籽,然后調(diào)用socket的outputStream.write,如果autoFlush為true的話砸抛,則立馬flush outputStream评雌;如果出現(xiàn)IOException則立馬進行重試树枫,這里重試的邏輯直接寫在catch里頭,根據(jù)maxNumRetries來景东,重試的時候砂轻,就是先createConnection,然后調(diào)用outputStream.write斤吐,重試的delay為CONNECTION_RETRY_DELAY(500)

小結(jié)

  • DataStream的writeToSocket方法搔涝,內(nèi)部創(chuàng)建了SocketClientSink,默認(rèn)傳遞的maxNumRetries為0和措,而且沒有調(diào)用帶autoFlush屬性默認(rèn)為false的構(gòu)造器庄呈,其autoFlush屬性默認(rèn)為false
  • open方法創(chuàng)建的socket,其keepAlive及tcpNoDelay均為true派阱,如果open的時候出現(xiàn)IOException诬留,則里頭拋出異常終止運行
  • invoke方法比較簡單,就是使用SerializationSchema來序列化value贫母,然后write到outputStream文兑;這里進行了簡單的失敗重試,默認(rèn)的重試delay為CONNECTION_RETRY_DELAY(500)腺劣,這個版本實現(xiàn)的重試比較簡單绿贞,是同步進行的

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市橘原,隨后出現(xiàn)的幾起案子籍铁,更是在濱河造成了極大的恐慌,老刑警劉巖靠柑,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件寨辩,死亡現(xiàn)場離奇詭異,居然都是意外死亡歼冰,警方通過查閱死者的電腦和手機靡狞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來隔嫡,“玉大人甸怕,你說我怎么就攤上這事∪鳎” “怎么了梢杭?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長秸滴。 經(jīng)常有香客問我武契,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任咒唆,我火速辦了婚禮届垫,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘全释。我一直安慰自己装处,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布浸船。 她就那樣靜靜地躺著妄迁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪李命。 梳的紋絲不亂的頭發(fā)上登淘,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天,我揣著相機與錄音项戴,去河邊找鬼形帮。 笑死,一個胖子當(dāng)著我的面吹牛周叮,可吹牛的內(nèi)容都是我干的辩撑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼仿耽,長吁一口氣:“原來是場噩夢啊……” “哼合冀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起项贺,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤君躺,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后开缎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體棕叫,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年奕删,在試婚紗的時候發(fā)現(xiàn)自己被綠了俺泣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡完残,死狀恐怖伏钠,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情谨设,我是刑警寧澤熟掂,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站扎拣,受9級特大地震影響赴肚,放射性物質(zhì)發(fā)生泄漏素跺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一誉券、第九天 我趴在偏房一處隱蔽的房頂上張望亡笑。 院中可真熱鬧,春花似錦横朋、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至衙传,卻和暖如春决帖,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蓖捶。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工地回, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人俊鱼。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓刻像,卻偏偏與公主長得像,于是被迫代替她去往敵國和親并闲。 傳聞我的和親對象是個殘疾皇子细睡,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內(nèi)容