En este tutorial, creamos un flujo de trabajo basado en eventos completamente utilitario utilizando Kombutratando la correo como una capacidad arquitectónica central. Recorremos paso a paso la configuración de intercambios, claves de enrutamiento, trabajadores en segundo plano y productores concurrentes, lo que nos permite observar un sistema distribuido positivo. A medida que implementamos cada componente, vemos cómo el flujo de mensajes íntegro, el procesamiento asincrónico y los patrones de enrutamiento nos brindan el mismo poder del que dependen los microservicios de producción todos los días. Mira el CÓDIGOS COMPLETOS.
!pip install kombu
import threading
import time
import logging
import uuid
import datetime
import sys
from kombu import Connection, Exchange, Queue, Producer, Consumer
from kombu.mixins import ConsumerMixin
logging.basicConfig(
level=logging.INFO,
format="%(message)s",
handlers=(logging.StreamHandler(sys.stdout)),
force=True
)
logger = logging.getLogger(__name__)
BROKER_URL = "memory://localhost/"
Comenzamos instalando Kombu, importando dependencias y configurando el registro para que podamos ver claramente cada mensaje que fluye a través del sistema. Incluso configuramos la URL del agente en memoria, lo que nos permite ejecutar todo localmente en Colab sin carestia de RabbitMQ. Esta configuración constituye la cojín de nuestro flujo de trabajo de correo distribuida. Mira el CÓDIGOS COMPLETOS.
media_exchange = Exchange('media_exchange', type="topic", durable=True)
task_queues = (
Queue('video_queue', media_exchange, routing_key='video.#'),
Queue('audit_queue', media_exchange, routing_key='#'),
)
Definimos un intercambio de temas para enrutar mensajes de forma flexible utilizando patrones comodín. Incluso creamos dos colas: una dedicada a tareas relacionadas con vídeos y otra culo de auditoría que audición todo. Al utilizar el enrutamiento de temas, podemos controlar con precisión cómo fluyen los mensajes a través del sistema. Mira el CÓDIGOS COMPLETOS.
class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
self.should_stop = False
def get_consumers(self, Consumer, channel):
return (
Consumer(queues=self.queues,
callbacks=(self.on_message),
accept=('json'),
prefetch_count=1)
)
def on_message(self, body, message):
routing_key = message.delivery_info('routing_key')
payload_id = body.get('id', 'unknown')
logger.info(f"n⚡ RECEIVED MSG via key: ({routing_key})")
logger.info(f" Payload ID: {payload_id}")
try:
if 'video' in routing_key:
self.process_video(body)
elif 'audit' in routing_key:
logger.info(" 🔍 (Audit) Logging event...")
message.ack()
logger.info(f" ✅ ACKNOWLEDGED")
except Exception as e:
logger.error(f" ❌ ERROR: {e}")
def process_video(self, body):
logger.info(" ⚙️ (Processor) Transcoding video (Simulating work...)")
time.sleep(0.5)
Implementamos un trabajador personalizado usando ConsumerMixin de Kombu para ejecutarlo en un hilo en segundo plano. En la devolución de citación del mensaje, inspeccionamos la esencia de enrutamiento, invocamos la función de procesamiento adecuada y confirmamos el mensaje. Esta inmueble de trabajo nos brinda un consumo de mensajes íntegro y simultáneo con control total. Mira el CÓDIGOS COMPLETOS.
def publish_messages(connection):
producer = Producer(connection)
tasks = (
('video.upload', {'file': 'movie.mp4'}),
('user.login', {'user': 'admin'}),
)
logger.info("n🚀 PRODUCER: Starting to publish messages...")
for r_key, data in tasks:
data('id') = str(uuid.uuid4())(:8)
logger.info(f"📤 SENDING: {r_key} -> {data}")
producer.publish(
data,
exchange=media_exchange,
routing_key=r_key,
serializer="json"
)
time.sleep(1.5)
logger.info("🏁 PRODUCER: Done.")
Ahora creamos un productor que envía cargas aperos JSON estructuradas al intercambio con diferentes claves de enrutamiento. Generamos identificaciones únicas para cada evento y observamos cómo se enrutan a otras colas. Esto refleja la publicación de eventos de microservicios en el mundo positivo, donde los productores y consumidores permanecen desacoplados. Mira el CÓDIGOS COMPLETOS.
def run_example():
with Connection(BROKER_URL) as conn:
worker = Worker(conn, task_queues)
worker_thread = threading.Thread(target=worker.run)
worker_thread.daemon = True
worker_thread.start()
logger.info("✅ SYSTEM: Worker thread started.")
time.sleep(1)
try:
publish_messages(conn)
time.sleep(2)
except KeyboardInterrupt:
pass
finally:
worker.should_stop = True
logger.info("n👋 SYSTEM: Execution complete.")
if __name__ == "__main__":
run_example()
Iniciamos al trabajador en un hilo en segundo plano y despedimos al productor en el hilo principal. Esta estructura nos proporciona un mini sistema distribuido que se ejecuta en Colab. Al observar los registros, vemos mensajes publicados → enrutados → consumidos → reconocidos, completando el ciclo de vida completo del procesamiento de eventos.
En conclusión, organizamos una canalización de enrutamiento de tareas dinámica y distribuida que procesa eventos en tiempo positivo con claridad y precisión. Fuimos testigos de cómo Kombu abstrae la complejidad de los sistemas de correo y al mismo tiempo nos brinda un control detallado sobre el enrutamiento, el consumo y la concurrencia de los trabajadores. A medida que vemos que los mensajes pasan del productor al intercambio, de la culo al trabajador, apreciamos más profundamente la elegancia del diseño de sistemas basados en eventos y ahora estamos admisiblemente equipados para progresar esta cojín a microservicios robustos, procesadores en segundo plano y flujos de trabajo de nivel empresarial.
Mira el CÓDIGOS COMPLETOS. No dudes en consultar nuestra Página de GitHub para tutoriales, códigos y cuadernos. Encima, no dudes en seguirnos en Gorjeo y no olvides unirte a nuestro SubReddit de más de 100.000 ml y suscríbete a nuestro boletín.
Asif Razzaq es el director ejecutor de Marktechpost Media Inc.. Como emprendedor e ingeniero soñador, Asif está comprometido a servirse el potencial de la inteligencia químico para el admisiblemente social. Su esfuerzo más fresco es el tirada de una plataforma de medios de inteligencia químico, Marktechpost, que se destaca por su cobertura en profundidad del estudios espontáneo y las noticiario sobre estudios profundo que es técnicamente sólida y fácilmente comprensible para una amplia audiencia. La plataforma cuenta con más de 2 millones de visitas mensuales, lo que ilustra su popularidad entre el manifiesto.