Imagina que tienes algunos datos de transmisión. Podría ser desde un sensor de Internet de las cosas (IoT), una ingesta de datos de registros o incluso datos de impresiones del comprador. Independientemente de la fuente, a usted se le ha asignado la tarea de realizar sobre los datos: alertar o activar cuando ocurre poco. Martín Fowler dice: “Usted mismo puede crear un motor de reglas simple. Todo lo que necesitas es crear un montón de objetos con condiciones y acciones, almacenarlos en una colección y ejecutarlos para evaluar las condiciones y ejecutar las acciones”.
A motor de reglas de negocio (o simplemente motor de reglas) es un sistema de software que ejecuta muchas reglas basadas en alguna entrada para determinar alguna salida. De guisa simplista, son muchas declaraciones de “si, entonces”, “y” y “o” que se evalúan en función de algunos datos. Hay muchos sistemas de reglas de negocios diferentes, como Drools, OpenL Tablets o incluso RuleBook, y todos comparten poco en global: definen reglas (colección de objetos con condiciones) que se ejecutan (evalúan las condiciones) para derivar una salida (ejecutar las acciones). El próximo es un ejemplo simplista:
Cuando una condición única o una composición de condiciones se evalúa como verdadera, se desea despachar una alerta para realizar potencialmente sobre ese evento (activar el calor para calentar la habitación a 50 grados).
Esta publicación demuestra cómo implementar un motor de reglas dinámicas usando Servicio administrado de Amazon para Apache Flink. Nuestra implementación brinda la capacidad de crear reglas dinámicas que se pueden crear y poner al día sin la requisito de cambiar o retornar a implementar el código subyacente o la implementación del motor de reglas en sí. Analizamos la bloque, los servicios secreto de la implementación, algunos detalles de implementación que puede utilizar para crear su propio motor de reglas y una Kit de exposición de la nubarrón de AWS (AWS CDK) para implementar esto en su propia cuenta.
Descripción militar de la posibilidad
El flujo de trabajo de nuestra posibilidad comienza con la ingesta de datos. Suponemos que tenemos algunos datos de origen. Podría ser de varios lugares, pero para esta demostración utilizamos datos de transmisión (datos de sensores de IoT) como datos de entrada. Esto es sobre lo que evaluaremos nuestras reglas. Por ejemplo, supongamos que estamos viendo datos de nuestro termostato doméstico AnyCompany. Veremos atributos como temperatura, ocupación, humedad y más. El termostato publica los títulos respectivos cada minuto, por lo que basaremos nuestras reglas en torno a esa idea. Oportuno a que estamos ingiriendo estos datos casi en tiempo verdadero, necesitamos un servicio diseñado específicamente para este caso de uso. Para esta posibilidad utilizamos Flujos de datos de Amazon Kinesis.
En un motor de reglas tradicional, puede favor una repertorio finita de reglas. La creación de nuevas reglas probablemente implicaría una revisión y reimplementación del código cojín, un reemplazo de algún archivo de reglas o algún proceso de sobrescritura. Sin bloqueo, un motor de reglas dinámicas es diferente. Al igual que nuestros datos de entrada de transmisión, nuestras reglas todavía se pueden transmitir. Aquí podemos usar Kinesis Data Streams para transmitir nuestras reglas a medida que se crean.
En este punto, tenemos dos flujos de datos:
- Los datos sin procesar de nuestro termostato.
- Las reglas de negocio quizás creadas a través de una interfaz de afortunado.
El próximo diagrama ilustra cómo podemos conectar estas transmisiones entre sí.
Conectando transmisiones
Un caso de uso distintivo del servicio administrado para Apache Flink es consultar y analizar datos de forma interactiva en tiempo verdadero y producir continuamente información para casos de uso urgentes. Teniendo esto en cuenta, si tiene una regla que corresponde a que la temperatura caiga por debajo de un cierto valía (especialmente en invierno), puede ser fundamental evaluarla y producir un resultado lo más oportuno posible.
Los conectores Apache Flink son componentes de software que mueven datos internamente y fuera de una aplicación de servicio administrado para Apache Flink. Los conectores son integraciones flexibles que le permiten adivinar archivos y directorios. Consisten en módulos completos para interactuar con servicios de AWS y sistemas de terceros. Para obtener más detalles sobre los conectores, consulte Utilice conectores de Apache Flink con servicio administrado para Apache Flink.
Utilizamos dos tipos de conectores (operadores) para esta posibilidad:
- Fuentes – Proporcionar información a su aplicación desde un flujo de datos, un archivo u otra fuente de datos de Kinesis
- Fregaderos – Destinar resultados desde su aplicación a un flujo de datos de Kinesis, Manguera de datos de Amazon flujo u otro destino de datos
Las aplicaciones de Flink son flujos de datos en streaming que pueden ser transformados por operadores definidos por el afortunado. Estos flujos de datos forman gráficos dirigidos que comienzan con una o más fuentes y terminan en uno o más sumideros. El próximo diagrama ilustra un flujo de datos de ejemplo (fuente). Como se analizó anteriormente, tenemos dos flujos de datos de Kinesis que pueden estar de moda como fuentes para nuestro software Flink.
El próximo fragmento de código muestra cómo configuramos nuestras fuentes de Kinesis internamente de nuestro código Flink:
Usamos un estado de transmisiónque se puede utilizar para combinar y procesar conjuntamente dos flujos de eventos de una guisa específica. Un estado de transmisión es una buena opción para aplicaciones que necesitan unir un flujo de bajo rendimiento y un flujo de parada rendimiento o que necesitan poner al día dinámicamente su deducción de procesamiento. El próximo diagrama ilustra un ejemplo de cómo se conecta el estado de transmisión. Para más detalles, ver Una orientación destreza para el estado de transmisión en Apache Flink.
Esto se ajusta a la idea de nuestro motor de reglas dinámicas, donde tenemos un flujo de reglas de bajo rendimiento (que se agrega según sea necesario) y un flujo de transacciones de parada rendimiento (que ingresa en un intervalo regular, como uno por minuto). Este flujo de transmisión nos permite tomar nuestro flujo de transacciones (o los datos del termostato) y conectarlo al flujo de reglas como se muestra en el próximo fragmento de código:
Para obtener más información sobre el estado de transmisión, consulte El patrón de estado de transmisión. Cuando el flujo de difusión se conecta al flujo de datos (como en el ejemplo susodicho), se convierte en un BroadcastConnectedStream
. La función aplicada a este flujo, que nos permite procesar las transacciones y reglas, implementa el processBroadcastElement
método. El KeyedBroadcastProcessFunction
La interfaz proporciona tres métodos para procesar registros y emitir resultados:
- procesoBroadcastElement() – Esto se apasionamiento para cada registro del flujo transmitido (nuestro flujo de reglas).
- medio ambiente de proceso() – Esto se apasionamiento para cada registro de la secuencia secreto. Proporciona camino de solo ojeada al estado de transmisión para evitar modificaciones que resulten en diferentes estados de transmisión en las instancias paralelas de la función. El
processElement
El método recupera la regla del estado de transmisión y el evento del sensor susodicho del estado con secreto. Si la expresión se evalúa comoTRUE
(que se analiza en la próximo sección), se emitirá una alerta. - en el temporizador() – Esto se apasionamiento cuando se activa un temporizador previamente registrado. Los temporizadores se pueden registrar en el
processElement
método y se utilizan para realizar cálculos o erradicar estados en el futuro. Esto se utiliza en nuestro código para certificar que cualquier antecedente antiguo (según lo define nuestra regla) se elimine según sea necesario.
Podemos manejar la regla en la instancia del estado de transmisión de la próximo guisa:
Observe lo que sucede en el código cuando el estado de la regla es INACTIVE
. Esto eliminaría la regla del estado de transmisión, lo que entonces ya no consideraría que se utilice la regla. De guisa similar, manejar la transmisión de una regla que es ACTIVE
agregaría o reemplazaría la regla internamente del estado de transmisión. Esto nos permite realizar cambios dinámicamente, agregando y eliminando reglas según sea necesario.
Evaluar reglas
Las reglas se pueden evaluar de diversas formas. Aunque no es un requisito, nuestras reglas fueron creadas en un Verbo de expresión Java (JEXL) formato compatible. Esto nos permite evaluar reglas proporcionando una expresión JEXL inmediato con el contexto apropiado (las transacciones necesarias para reevaluar la regla o los pares clave-valor) y simplemente llamando al método de evaluación:
Una característica poderosa de JEXL es que no solo admite expresiones simples (como aquellas que incluyen comparación y aritmética), sino que todavía tiene soporte para funciones definidas por el afortunado. JEXL le permite seducir a cualquier método en un objeto Java usando la misma sintaxis. Si hay un POJO con el nombre SENSOR_cebb1baf_2df0_4267_b489_28be562fccea
que tiene el metodo hasNotChanged
llamarías a ese método usando la expresión. Puede encontrar más de estas funciones definidas por el afortunado que utilizamos en nuestro SensorMapState
clase.
Veamos un ejemplo de cómo funcionaría esto, utilizando una expresión de regla que dice lo próximo:
"SENSOR_cebb1baf_2df0_4267_b489_28be562fccea.hasNotChanged(5)"
Esta regla, evaluada por JEXL, equivaldría a un sensor que no ha cambiado en 5 minutos
La función correspondiente definida por el afortunado (parte de SensorMapState
) que está expuesto a JEXL (usando el contexto) es el próximo:
Los datos relevantes, como el que se muestra a continuación, irían a la ventana contextual, que luego se usaría para evaluar la regla.
En este caso, el resultado (o valía de isAlertTriggered
) es TRUE
.
Creando sumideros
Al igual que creamos fuentes anteriormente, todavía podemos crear sumideros. Estos sumideros se utilizarán como final de nuestro procesamiento de flujo, donde nuestros resultados analizados y evaluados se emitirán para uso futuro. Al igual que nuestra fuente, nuestro sumidero todavía es un flujo de datos de Kinesis, donde un consumidor Lambda descendente iterará los registros y los procesará para tomar la energía adecuada. Existen muchas aplicaciones del procesamiento posterior; por ejemplo, podemos conservar el resultado de esta evaluación, crear una notificación cibernética o poner al día un panel de reglas.
Con cojín en la evaluación susodicho, tenemos la próximo deducción internamente de la propia función del proceso:
Cuando la función de proceso emite la alerta, la respuesta de alerta se envía al receptor, que luego se puede adivinar y utilizar en sentido descendente en la bloque:
En este punto, podemos procesarlo. Tenemos una función Lambda registrando los registros donde podemos ver lo próximo:
Aunque se simplifican en este ejemplo, estos fragmentos de código forman la cojín para tomar los resultados de la evaluación y enviarlos a otra parte.
Conclusión
En esta publicación, demostramos cómo implementar un motor de reglas dinámicas utilizando Managed Service para Apache Flink con las reglas y los datos de entrada transmitidos a través de Kinesis Data Streams. Puedes estudiar más al respecto con el e-learning que tenemos disponibles.
A medida que las empresas buscan implementar motores de reglas casi en tiempo verdadero, esta bloque presenta una posibilidad convincente. El servicio administrado para Apache Flink ofrece poderosas capacidades para metamorfosear y analizar datos de transmisión en tiempo verdadero, al tiempo que simplifica la suministro de las cargas de trabajo de Flink y se integra perfectamente con otros servicios de AWS.
Para ayudarlo a comenzar con esta bloque, nos complace anunciar que publicaremos el código completo de nuestro motor de reglas como muestra en GitHub. Este ejemplo completo irá más allá de los fragmentos de código proporcionados en nuestra publicación y ofrecerá una vistazo más profunda a las complejidades de crear un motor de reglas dinámicas con Flink.
Le animamos a explorar este código de muestra, adaptarlo a su caso de uso específico y servirse todo el potencial del procesamiento de datos en tiempo verdadero en sus aplicaciones. Mira el repositorio de GitHuby no dude en comunicarse con nosotros si tiene alguna pregunta o comentario mientras se embarca en su alucinación con Flink y AWS.
Acerca de los autores
Steven Carpintero es desarrollador senior de soluciones en el equipo de ingeniería de clientes y prototipos de industrias de AWS (PACE), y ayuda a los clientes de AWS a dar vida a ideas innovadoras mediante la creación rápida de prototipos en la plataforma de AWS. Tiene una ingenio en Ciencias de la Computación de la Universidad Estatal Wayne en Detroit, Michigan. ¡Conéctate con Steven en LinkedIn!
Aravindharaj Rajendran es desarrollador senior de soluciones internamente del equipo de ingeniería de clientes y prototipos de industrias de AWS (PACE), con sede en Herndon, VA. Ayuda a los clientes de AWS a materializar sus ideas innovadoras mediante la creación rápida de prototipos utilizando la plataforma AWS. Fuera del trabajo, le encanta juguetear juegos de PC, bádminton y alucinar.