Django:使用django-logpipe進行Kafka整合

django-logpipe庫

——該庫充當用于在Django應用程序和服務之間移動數(shù)據(jù)的通用管道奕塑。它建立在Boto3捂贿,Apache Kafkakafka-pythonDjango REST Framework之上。
安裝:pip install django-logpipe

將logpipe添加到已安裝的應用中:
INSTALLED_APPS = [ 
    ... 
    'logpipe',
    ... 
]

將連接設置添加到settings.py文件毯炮。配置Kafka是這樣的:

LOGPIPE = { 
    #必需設置
    'OFFSET_BACKEND':'logpipe.backend.kafka.ModelOffsetStore',
    'CONSUMER_BACKEND':'logpipe.backend.kafka.Consumer'耸黑,
    'PRODUCER_BACKEND':'logpipe.backend.kafka.Producer'桃煎,
    'KAFKA_BOOTSTRAP_SERVERS ':[ 
        'kafka:9092' 
    ],
    'KAFKA_CONSUMER_KWARGS':{ 
        'group_id':'django-logpipe'大刊,
    }为迈,

    #可選設置
    #'KAFKA_SEND_TIMEOUT':10缺菌,
    #'KAFKA_MAX_SEND_RETRIES':0葫辐,
    #'MIN_MESSAGE_LAG_MS':0 ,
    #'DEFAULT_FORMAT':'json'伴郁,
}

運行遷移python manage.py migrate logpipe
耿战。這將創(chuàng)建用于存儲Kafka日志位置偏移的模型:

用法

串行器

使用logpipe發(fā)送或接收消息的第一步是定義序列化程序。logpipe的序列化程序有一些規(guī)則:

  1. 必須是rest_framework.serializers.Serializer的子類或實現(xiàn)模仿rest_framework.serializers.Serializer的接口的類蛾绎。
  2. 必須在類上定義MESSAGE_TYPE屬性。該值應該是一個字符串鸦列,它定義唯一定義其主題/流中的數(shù)據(jù)類型租冠。
  3. 必須在類上定義VERSION屬性。該值應為表示模式版本號的單調整數(shù)薯嗤。
  4. 必須有KEY_FIELD在類上定義的屬性顽爹,表示要用作消息鍵的字段的名稱。消息密鑰由Kafka在執(zhí)行日志壓縮時使用骆姐,并由Kinesis用作分片分區(qū)鍵镜粤。對于不需要密鑰的主題捏题,可以省略該屬性。
  5. 如果序列化程序將用于傳入消息肉渴,則應實現(xiàn)類方法lookup_instance(cls公荧,** kwargs)。在實例化序列化程序之前同规,將直接使用消息數(shù)據(jù)作為關鍵字參數(shù)調用此類方法循狰。它應該查找并返回相關對象(如果存在),以便在初始化期間將其傳遞給序列化程序的實例參數(shù)券勺。如果還沒有對象存在(消息表示新對象)绪钥,則應返回None。
    下面是一個示例Django模型及其序列化程序关炼。
from django.db import models
from rest_framework import serializers
import uuid

class Person(models.Model):
    uuid = models.UUIDField(default=uuid.uuid4, unique=True)
    first_name = models.CharField(max_length=200)
    last_name = models.CharField(max_length=200)

class PersonSerializer(serializers.ModelSerializer):
    MESSAGE_TYPE = 'person'
    VERSION = 1
    KEY_FIELD = 'uuid'

    class Meta:
        model = Person
        fields = ['uuid', 'first_name', 'last_name']

    @classmethod
    def lookup_instance(cls, uuid, **kwargs):
        try:
            return Person.objects.get(uuid=uuid)
        except models.Person.DoesNotExist:
            pass
發(fā)送消息

一旦存在序列化程序程腹,就可以通過創(chuàng)建Producer對象并調用send方法向Kafka發(fā)送消息。

from logpipe import Producer
joe = Person.objects.create(first_name='Joe', last_name='Schmoe')
producer = Producer('people', PersonSerializer)
producer.send(joe)

上面的示例代碼將以下消息發(fā)送到名為people的Kafka topic 儒拂。

json:{
    "type":"person",
    "version":1,"message":{
        "first_name":"Joe",
        "last_name":"Schmoe",
        "uuid":"xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
    }
}
接收消息

為了處理傳入的消息寸潦,可以重用相同的模型和序列化器。我們只需要實例化一個Consumer對象侣灶。

# Watch for messages, but timeout after 1000ms of no messages
consumer = Consumer('people', consumer_timeout_ms=1000)
consumer.register(PersonSerializer)
consumer.run()

# Watch for messages and block forever
consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.run()

Consumer對象使用Django REST Framework的內置保存甸祭,創(chuàng)建和更新方法來應用消息。如果消息沒有直接綁定到Django模型褥影,可以跳過定義的lookup_instance類方法并覆蓋save方法來自定義邏輯池户。
如果在單個topic或stream中有多個數(shù)據(jù)類型,則可以通過向Consumer注冊多個序列化程序來使用它們凡怎。

consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()

還可以通過為每種消息類型版本定義序列化程序并將其全部注冊到Consumer來支持多種不兼容的消息類型校焦。

consumer = Consumer('people')
consumer.register(PersonSerializerVersion1)
consumer.register(PersonSerializerVersion2)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()

如果有多個stream或topic,需要為每一個都創(chuàng)建一個Consumer统倒,并使用MultiConsumer查看這些stream或topic寨典。

from logpipe import MultiConsumer
people_consumer = Consumer('people')
people_consumer.register(PersonSerializer)
places_consumer = Consumer('places')
places_consumer.register(PlaceSerializer)
multi = MultiConsumer(people_consumer, places_consumer)

# Watch for 'people' and 'places' topics indefinitely
multi.run()

最后,可以通過build_kafka_consumer管理命令中的構建來自動注冊和運行Consumer房匆。

# myapp/apps.py
from django.apps import AppConfig
from logpipe import Consumer, register_consumer

class MyAppConfig(AppConfig):
    name = 'myapp'

# Register consumers with logpipe
@register_consumer
def build_person_consumer():
    consumer = Consumer('people')
    consumer.register(PersonSerializer)
    return consumer

使用register_consumer裝飾器可以根據(jù)需要注冊盡可能多的Consumer和topic耸成。然后運行run_kafka_consumer命令以循環(huán)方式自動處理所有Consumer的消息。python manage.py run_kafka_consumer

處理架構更改

使用每個序列化程序類所需的VERSION屬性處理架構更改浴鸿。發(fā)送時井氢,生產(chǎn)者在消息數(shù)據(jù)中包含模式版本號。然后岳链,當消費者收到消息時花竞,它會查找具有匹配版本號的寄存器序列化器。如果未找到具有匹配版本號的序列化程序掸哑,則會引發(fā)logpipe.exceptions.UnknownMessageVersionError異常约急。

要執(zhí)行向后不兼容的架構更改零远,應執(zhí)行以下步驟。

更新使用者代碼以了解新架構版本厌蔽。將生產(chǎn)者代碼更新為發(fā)送新架構版本牵辣。經(jīng)過一段時間后(當確定Kafka中仍然不存在舊版本消息時),請刪除與舊架構版本相關的代碼躺枕。
例如服猪,如果我們想要在上面定義的Person模型上需要一個電子郵件字段,那么第一步是更新消費者以了解新字段:

class Person(models.Model):
    uuid = models.UUIDField(default = uuid.uuid4拐云,unique = True)
    first_name = models.CharField(max_length = 200)
    last_name = models.CharField(max_length = 200)
    email = models.EmailField( max_length = 200罢猪,null = True)


class PersonSerializerV1(serializers.ModelSerializer):
    MESSAGE_TYPE = 'person'
    VERSION = 1
    KEY_FIELD = 'uuid'class 
    class Meta:
        model = Person 
        fields = ['uuid','first_name'叉瘩,'last_name'] 


class PersonSerializerV2(PersonSerializerV1):
    MESSAGE_TYPE ='person'
    VERSION = 2 
    class Meta(PersonSerializerV1.META):
        fields = ['uuid'膳帕,'first_name','last_name'薇缅,'email'] 

consumer = Consumer('people'危彩,consumer_timeout_ms = 1000)
consumer.register(PersonSerializerV1)
consumer.register(PersonSerializerV2)

消費者現(xiàn)在將使用適當?shù)男蛄谢绦騺盹@示消息版本。其次泳桦,我們需要將生產(chǎn)者代碼更新為使用模式版本2:

producer = Producer('people', PersonSerializerV2)

最后汤徽,在刪除所有舊版本1消息(通過日志壓縮)之后,可以從代碼庫中刪除PersonSerializerV1類灸撰。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末谒府,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子浮毯,更是在濱河造成了極大的恐慌完疫,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件债蓝,死亡現(xiàn)場離奇詭異壳鹤,居然都是意外死亡,警方通過查閱死者的電腦和手機饰迹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門芳誓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人啊鸭,你說我怎么就攤上這事锹淌。” “怎么了莉掂?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵葛圃,是天一觀的道長千扔。 經(jīng)常有香客問我憎妙,道長库正,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任厘唾,我火速辦了婚禮褥符,結果婚禮上,老公的妹妹穿的比我還像新娘抚垃。我一直安慰自己喷楣,他們只是感情好,可當我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布鹤树。 她就那樣靜靜地躺著铣焊,像睡著了一般。 火紅的嫁衣襯著肌膚如雪罕伯。 梳的紋絲不亂的頭發(fā)上曲伊,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天,我揣著相機與錄音追他,去河邊找鬼坟募。 笑死,一個胖子當著我的面吹牛邑狸,可吹牛的內容都是我干的懈糯。 我是一名探鬼主播,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼单雾,長吁一口氣:“原來是場噩夢啊……” “哼赚哗!你這毒婦竟也來了?” 一聲冷哼從身側響起铁坎,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蜂奸,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后硬萍,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扩所,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年朴乖,在試婚紗的時候發(fā)現(xiàn)自己被綠了祖屏。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡买羞,死狀恐怖袁勺,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情畜普,我是刑警寧澤期丰,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響钝荡,放射性物質發(fā)生泄漏街立。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一埠通、第九天 我趴在偏房一處隱蔽的房頂上張望赎离。 院中可真熱鬧,春花似錦端辱、人聲如沸梁剔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽荣病。三九已至,卻和暖如春渗柿,著一層夾襖步出監(jiān)牢的瞬間众雷,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工做祝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留砾省,地道東北人。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓混槐,卻偏偏與公主長得像编兄,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子声登,可洞房花燭夜當晚...
    茶點故事閱讀 44,901評論 2 355