Kata - Valkey PubSub

Valkey1 (originado do Redis-7.2.4) é um repositório de dados de chave/valor de alto desempenho de código aberto que suporta uma variedade de cargas de trabalho, como cache e filas de mensagens.

O projeto é apoiado pela Linux Foundation.

Identificação do desafio

Publicar e obter mensagem publicada em diferentes canais.

  • publicar mensagens em diferentes canais (exemplo: canal 1, canal 2, canal 3…, canal N)
  • obter mensagens de dois diferentes canais que tenham registros de mensagens

Pistas ou dicas

Usar bibliotecas disponibilizadas em sua linguagem de domínio.

Em Python temos:

  • valkey-glide2 e valkey-py3

Recursos

  • Valkey com docker compose (compose.yaml)
    Para inicializar o serviço local com Docker

    ---
    # doker compose up -d
    services:
      valkey:
        image: valkey/valkey:8.1-alpine
        ports:
          - 6379:6379
    
  • Cliente em linha de comando

    valkey-cli --help # para detalhes de subcomandos e parâmetros
    

Obstáculos

Entender como funciona broker de mensagens e de chave/valor.

Solução

🙈 🙉 🙊

Publicador de mensagens
import asyncio
from sys import argv

from glide import GlideClient, GlideClientConfiguration, NodeAddress

publishing_config = GlideClientConfiguration([NodeAddress("localhost", 6379)])


async def main(message: str = "Test message", channel: str = "ch1"):
    publishing_client = await GlideClient.create(publishing_config)

    # publish message on ch1 channel
    await publishing_client.publish(message, channel)


if __name__ == "__main__":
    msg = argv[1] if len(argv) >= 2 else None
    ch = argv[2] if len(argv) == 3 else None
    if all([msg, ch]):
        asyncio.run(main(msg, ch))
    elif msg:
        asyncio.run(main(msg))
    else:
        asyncio.run(main())
Consumidor de mensagens
import asyncio
from typing import Any

from glide import CoreCommands, GlideClient, GlideClientConfiguration, NodeAddress


def callback(msg: CoreCommands.PubSubMsg, context: Any):
    print(f"Received {msg}, context {context}\n")


class Consumer:
    _listening_client: GlideClient
    _listening_config: GlideClientConfiguration
    _context: dict

    def __init__(self, context: dict = {"id": "consumer_id1"}):
        self._context = context

        self._listening_config = GlideClientConfiguration(
            [NodeAddress("localhost", 6379)],
            pubsub_subscriptions=GlideClientConfiguration.PubSubSubscriptions(  # subscriptions are configured here
                channels_and_patterns={
                    GlideClientConfiguration.PubSubChannelModes.Exact: {
                        "ch1",
                        "ch2",
                        "ch3",
                    },  # Listens for messages published to 'ch1' and 'ch2' channel, in unsharded mode
                    GlideClientConfiguration.PubSubChannelModes.Pattern: {
                        "chat*"
                    },  # Listens for messages published to channels matched by 'chat*' glob pattern, in unsharded mode
                },
                callback=callback,
                context=self._context,
            ),
        )

    async def main(self):
        self._listening_client = await GlideClient.create(self._listening_config)
        while True:
            await asyncio.sleep(10)

    async def close(self):
        print("Consummer finalizado.")
        await self._listening_client.close()  # unsubscribe happens here


if __name__ == "__main__":
    try:
        consumer = Consumer()
        asyncio.run(consumer.main())
    except KeyboardInterrupt:
        print("Consummer interrompido pelo usuário.")
    finally:
        asyncio.run(consumer.close())

Referências


  1. Valkey NoSQL Database https://valkey.io/ ↩︎

  2. pip install valkey-glide https://github.com/valkey-io/valkey-py ↩︎

  3. pip install valkey https://github.com/valkey-io/valkey-py ↩︎