django-logpipe庫
——該庫充當用于在Django應用程序和服務之間移動數(shù)據(jù)的通用管道奕塑。它建立在Boto3捂贿,Apache Kafka,kafka-python和Django REST Framework之上。
安裝:pip install django-logpipe
將logpipe添加到已安裝的應用中:
INSTALLED_APPS = [
...
'logpipe',
...
]
將連接設置添加到settings.py文件毯炮。配置Kafka是這樣的:
LOGPIPE = {
#必需設置
'OFFSET_BACKEND':'logpipe.backend.kafka.ModelOffsetStore',
'CONSUMER_BACKEND':'logpipe.backend.kafka.Consumer'耸黑,
'PRODUCER_BACKEND':'logpipe.backend.kafka.Producer'桃煎,
'KAFKA_BOOTSTRAP_SERVERS ':[
'kafka:9092'
],
'KAFKA_CONSUMER_KWARGS':{
'group_id':'django-logpipe'大刊,
}为迈,
#可選設置
#'KAFKA_SEND_TIMEOUT':10缺菌,
#'KAFKA_MAX_SEND_RETRIES':0葫辐,
#'MIN_MESSAGE_LAG_MS':0 ,
#'DEFAULT_FORMAT':'json'伴郁,
}
運行遷移python manage.py migrate logpipe
耿战。這將創(chuàng)建用于存儲Kafka日志位置偏移的模型:
用法
串行器
使用logpipe發(fā)送或接收消息的第一步是定義序列化程序。logpipe的序列化程序有一些規(guī)則:
- 必須是rest_framework.serializers.Serializer的子類或實現(xiàn)模仿rest_framework.serializers.Serializer的接口的類蛾绎。
- 必須在類上定義MESSAGE_TYPE屬性。該值應該是一個字符串鸦列,它定義唯一定義其主題/流中的數(shù)據(jù)類型租冠。
- 必須在類上定義VERSION屬性。該值應為表示模式版本號的單調整數(shù)薯嗤。
- 必須有KEY_FIELD在類上定義的屬性顽爹,表示要用作消息鍵的字段的名稱。消息密鑰由Kafka在執(zhí)行日志壓縮時使用骆姐,并由Kinesis用作分片分區(qū)鍵镜粤。對于不需要密鑰的主題捏题,可以省略該屬性。
- 如果序列化程序將用于傳入消息肉渴,則應實現(xiàn)類方法lookup_instance(cls公荧,** kwargs)。在實例化序列化程序之前同规,將直接使用消息數(shù)據(jù)作為關鍵字參數(shù)調用此類方法循狰。它應該查找并返回相關對象(如果存在),以便在初始化期間將其傳遞給序列化程序的實例參數(shù)券勺。如果還沒有對象存在(消息表示新對象)绪钥,則應返回None。
下面是一個示例Django模型及其序列化程序关炼。
from django.db import models
from rest_framework import serializers
import uuid
class Person(models.Model):
uuid = models.UUIDField(default=uuid.uuid4, unique=True)
first_name = models.CharField(max_length=200)
last_name = models.CharField(max_length=200)
class PersonSerializer(serializers.ModelSerializer):
MESSAGE_TYPE = 'person'
VERSION = 1
KEY_FIELD = 'uuid'
class Meta:
model = Person
fields = ['uuid', 'first_name', 'last_name']
@classmethod
def lookup_instance(cls, uuid, **kwargs):
try:
return Person.objects.get(uuid=uuid)
except models.Person.DoesNotExist:
pass
發(fā)送消息
一旦存在序列化程序程腹,就可以通過創(chuàng)建Producer對象并調用send方法向Kafka發(fā)送消息。
from logpipe import Producer
joe = Person.objects.create(first_name='Joe', last_name='Schmoe')
producer = Producer('people', PersonSerializer)
producer.send(joe)
上面的示例代碼將以下消息發(fā)送到名為people的Kafka topic 儒拂。
json:{
"type":"person",
"version":1,"message":{
"first_name":"Joe",
"last_name":"Schmoe",
"uuid":"xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx"
}
}
接收消息
為了處理傳入的消息寸潦,可以重用相同的模型和序列化器。我們只需要實例化一個Consumer對象侣灶。
# Watch for messages, but timeout after 1000ms of no messages
consumer = Consumer('people', consumer_timeout_ms=1000)
consumer.register(PersonSerializer)
consumer.run()
# Watch for messages and block forever
consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.run()
Consumer對象使用Django REST Framework的內置保存甸祭,創(chuàng)建和更新方法來應用消息。如果消息沒有直接綁定到Django模型褥影,可以跳過定義的lookup_instance類方法并覆蓋save方法來自定義邏輯池户。
如果在單個topic或stream中有多個數(shù)據(jù)類型,則可以通過向Consumer注冊多個序列化程序來使用它們凡怎。
consumer = Consumer('people')
consumer.register(PersonSerializer)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()
還可以通過為每種消息類型版本定義序列化程序并將其全部注冊到Consumer來支持多種不兼容的消息類型校焦。
consumer = Consumer('people')
consumer.register(PersonSerializerVersion1)
consumer.register(PersonSerializerVersion2)
consumer.register(PlaceSerializer)
consumer.register(ThingSerializer)
consumer.run()
如果有多個stream或topic,需要為每一個都創(chuàng)建一個Consumer统倒,并使用MultiConsumer查看這些stream或topic寨典。
from logpipe import MultiConsumer
people_consumer = Consumer('people')
people_consumer.register(PersonSerializer)
places_consumer = Consumer('places')
places_consumer.register(PlaceSerializer)
multi = MultiConsumer(people_consumer, places_consumer)
# Watch for 'people' and 'places' topics indefinitely
multi.run()
最后,可以通過build_kafka_consumer管理命令中的構建來自動注冊和運行Consumer房匆。
# myapp/apps.py
from django.apps import AppConfig
from logpipe import Consumer, register_consumer
class MyAppConfig(AppConfig):
name = 'myapp'
# Register consumers with logpipe
@register_consumer
def build_person_consumer():
consumer = Consumer('people')
consumer.register(PersonSerializer)
return consumer
使用register_consumer裝飾器可以根據(jù)需要注冊盡可能多的Consumer和topic耸成。然后運行run_kafka_consumer命令以循環(huán)方式自動處理所有Consumer的消息。python manage.py run_kafka_consumer
處理架構更改
使用每個序列化程序類所需的VERSION屬性處理架構更改浴鸿。發(fā)送時井氢,生產(chǎn)者在消息數(shù)據(jù)中包含模式版本號。然后岳链,當消費者收到消息時花竞,它會查找具有匹配版本號的寄存器序列化器。如果未找到具有匹配版本號的序列化程序掸哑,則會引發(fā)logpipe.exceptions.UnknownMessageVersionError異常约急。
要執(zhí)行向后不兼容的架構更改零远,應執(zhí)行以下步驟。
更新使用者代碼以了解新架構版本厌蔽。將生產(chǎn)者代碼更新為發(fā)送新架構版本牵辣。經(jīng)過一段時間后(當確定Kafka中仍然不存在舊版本消息時),請刪除與舊架構版本相關的代碼躺枕。
例如服猪,如果我們想要在上面定義的Person模型上需要一個電子郵件字段,那么第一步是更新消費者以了解新字段:
class Person(models.Model):
uuid = models.UUIDField(default = uuid.uuid4拐云,unique = True)
first_name = models.CharField(max_length = 200)
last_name = models.CharField(max_length = 200)
email = models.EmailField( max_length = 200罢猪,null = True)
class PersonSerializerV1(serializers.ModelSerializer):
MESSAGE_TYPE = 'person'
VERSION = 1
KEY_FIELD = 'uuid'class
class Meta:
model = Person
fields = ['uuid','first_name'叉瘩,'last_name']
class PersonSerializerV2(PersonSerializerV1):
MESSAGE_TYPE ='person'
VERSION = 2
class Meta(PersonSerializerV1.META):
fields = ['uuid'膳帕,'first_name','last_name'薇缅,'email']
consumer = Consumer('people'危彩,consumer_timeout_ms = 1000)
consumer.register(PersonSerializerV1)
consumer.register(PersonSerializerV2)
消費者現(xiàn)在將使用適當?shù)男蛄谢绦騺盹@示消息版本。其次泳桦,我們需要將生產(chǎn)者代碼更新為使用模式版本2:
producer = Producer('people', PersonSerializerV2)
最后汤徽,在刪除所有舊版本1消息(通過日志壓縮)之后,可以從代碼庫中刪除PersonSerializerV1類灸撰。