Kata - Valkey Publicar/Assinar (PubSub)
Kata - Valkey PubSub

Valkey1 (originado do Redis-7.2.4) é um repositório de dados de chave/valor de alto desempenho de código aberto que suporta uma variedade de cargas de trabalho, como cache e filas de mensagens.
O projeto é apoiado pela Linux Foundation.
Identificação do desafio
Publicar e obter mensagem publicada em diferentes canais.
- publicar mensagens em diferentes canais (exemplo: canal 1, canal 2, canal 3…, canal N)
- obter mensagens de dois diferentes canais que tenham registros de mensagens
Pistas ou dicas
Usar bibliotecas disponibilizadas em sua linguagem de domínio.
Em Python temos:
Recursos
-
Valkey com docker compose (
compose.yaml
)
Para inicializar o serviço local com Docker--- # doker compose up -d services: valkey: image: valkey/valkey:8.1-alpine ports: - 6379:6379
-
Cliente em linha de comando
valkey-cli --help # para detalhes de subcomandos e parâmetros
Obstáculos
Entender como funciona broker de mensagens e de chave/valor.
Solução
🙈 🙉 🙊
Publicador de mensagens
import asyncio
from sys import argv
from glide import GlideClient, GlideClientConfiguration, NodeAddress
publishing_config = GlideClientConfiguration([NodeAddress("localhost", 6379)])
async def main(message: str = "Test message", channel: str = "ch1"):
publishing_client = await GlideClient.create(publishing_config)
# publish message on ch1 channel
await publishing_client.publish(message, channel)
if __name__ == "__main__":
msg = argv[1] if len(argv) >= 2 else None
ch = argv[2] if len(argv) == 3 else None
if all([msg, ch]):
asyncio.run(main(msg, ch))
elif msg:
asyncio.run(main(msg))
else:
asyncio.run(main())
Consumidor de mensagens
import asyncio
from typing import Any
from glide import CoreCommands, GlideClient, GlideClientConfiguration, NodeAddress
def callback(msg: CoreCommands.PubSubMsg, context: Any):
print(f"Received {msg}, context {context}\n")
class Consumer:
_listening_client: GlideClient
_listening_config: GlideClientConfiguration
_context: dict
def __init__(self, context: dict = {"id": "consumer_id1"}):
self._context = context
self._listening_config = GlideClientConfiguration(
[NodeAddress("localhost", 6379)],
pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions( # subscriptions are configured here
channels_and_patterns={
GlideClientConfiguration.PubSubChannelModes.Exact: {
"ch1",
"ch2",
"ch3",
}, # Listens for messages published to 'ch1' and 'ch2' channel, in unsharded mode
GlideClientConfiguration.PubSubChannelModes.Pattern: {
"chat*"
}, # Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode
},
callback=callback,
context=self._context,
),
)
async def main(self):
self._listening_client = await GlideClient.create(self._listening_config)
while True:
await asyncio.sleep(10)
async def close(self):
print("Consummer finalizado.")
await self._listening_client.close() # unsubscribe happens here
if __name__ == "__main__":
try:
consumer = Consumer()
asyncio.run(consumer.main())
except KeyboardInterrupt:
print("Consummer interrompido pelo usuário.")
finally:
asyncio.run(consumer.close())
Referências
-
Valkey NoSQL Database https://valkey.io/ ↩︎
-
pip install valkey-glide
https://github.com/valkey-io/valkey-py ↩︎ -
pip install valkey
https://github.com/valkey-io/valkey-py ↩︎