Solución Data Discovery Near Real Time

España se considera como un país con tráfico vial moderado. Solo en 2014 se tiene constancia de más de 30 millones de vehículos españoles en circulación, sin contar con otros aquellos que nos visitan por motivos de turismo y negocios. Todo ello hace que tengamos millones de desplazamientos de vehículos cada día. Nuestros cuerpos de fuerzas del estado hace una labor formidable por garantizar la seguridad en carretera pero la cifra mencionada hace que muchos vehículos de interés policial pasen desapercibidos a sus ojos salvo que estén cometiendo una ilegalidad en ese momento o detecten algo extraño.




Por ello mi proyecto permite analizar en “tiempo real” las matrículas obtenidas por diversas fuentes (cámaras de tráfico, peajes, parking públicos y privados) y las contraste con los otras fuentes de instituciones públicas aportando toda la información posible. En esta maqueta analizaremos si los vehículos tienen el seguro de suscripción obligatorio y si se encuentra matriculado correctamente.

Bases de datos utilizadas

Vehículos Datos sobre marca, estado, matriculación, seguro. Base de datos ficticia alojada en MariaDB
Cámaras Eventos de vehículos registrados por una cámara
Ciudadanos Información de personas físicas, alojada en MariaDB 
Radares Listado de radares con velocidad en la península ibérica


Arquitectura diseñada


Partimos de la base en la que se recibe en una base de datos SQL los eventos de las cámaras recogiendo un identificador de la misma y la matrícula del vehículo detectado. Para realizar esta simulación hemos utilizado LAMP tanto en la instancia de Amazon como la local (Linux Mint).

LAMP es un kit de aplicaciones enfocados a la Web (Apache, MySQL) con el que se mediante un archivo PHP (http://IP/genera_matricula.php) genero el timestamp y los registros aleatorios de Nº de radar y matricula almacenando el resultado en base de datos. El Nº de radar corresponde con datos reales de radares y las matrículas son generadas entre 1000BBB y 9999BBB. La generación de datos en bloque se realiza mediante un script en Bash iterativo: Genera 4000 registros por cada ejecución, que paralelizando se pueden crear 100.000 registros en 1-2 minutos.
Un ejemplo de output del script sería el siguiente.
<html><head><meta http-equiv="refresh" content="1; url=" />
</head><body><b>Sentencia ejecutada: </b>INSERT INTO camaras (matricula,num_camara) VALUES ES ("1501BBB",4552)<hr /><h2>Registros en BBDD: 94009</h2></body></html>

<html><head><meta http-equiv="refresh" content="1; url=" />
</head><body><b>Sentencia ejecutada: </b>INSERT INTO camaras (matricula,num_camara) VALUES ("9924BBB",6535)<hr /><h2>Registros en BBDD: 94012</h2></body></html>

<html><head><meta http-equiv="refresh" content="1; url=" />
</head><body><b>Sentencia ejecutada: </b>INSERT INTO camaras (matricula,num_camara) VALUES ("5497BBB",2887)<hr /><h2>Registros en BBDD: 94016</h2></body></html>

LAMP Por sí solo no realiza nada más salvo la generación, es Maxwell quien está a la escucha de eventos sobre la base de datos (en este caso inserciones) del servidor MySQL en tiempo real formando una cadena JSON con toda la información y se lo entrega al websocket 9092: la parte suscriptora de Apache Kafka. Su salida por consola es la siguiente:
23:02:48,478 DEBUG MaxwellKafkaProducer - ->  key:{"database":"dgt","table":"camaras","pk.id":87244}, partition:0, offset:17
23:02:48,480 DEBUG MaxwellKafkaProducer -    {"database":"dgt","table":"camaras","type":"insert","ts":1453500168,"xid":3039,"commit":true,"data":{"num_camara":5602,"matricula":"9261BBB","id":87244}}
23:02:48,480 DEBUG MaxwellKafkaProducer -    BinlogPosition[master.000036:6545]


Apache Kafka  es un sistema suscriptor/publicador que procesa grandes flujos de datos. Su modo de funcionamiento es levantar un socket (Por defecto en el puerto 9092) asignándole un nombre o tópico identificativo (en este caso el tópico se llama “cámaras”) para separar los diferentes flujos de datos. Se suscribe a los eventos que devuelve Maxwell y los almacena en su espacio local hasta que pueden ser entregados a Apache Spark, esto quiere decir, hace de Staging hasta que pueden ser procesado y consumidos por la herramienta conectada en el lado de publicador, en este caso es Apache Spark Streaming. Su Output se ha omitido por temas de rendimiento, es muy similar al recibido por Maxwell.

Spark Streaming es un sistema de procesamiento de datos ultra rápido el cual se encarga de realizar el filtrado de información a lo requerido (En la presentación se filtrarán de los miles de matrículas las recibidas con terminación en “97BBB”). Spark Streaming levanta varios procesos: uno como máster, que coordina a los workers y el resto como workers, procesos que consumirán los flujos recibidos. Todo esto viene apoyado por un interfaz gráfico donde podemos visualizar las estadísticas de procesamiento.


En esta fase del proyecto existen varios outputs interesantes en Spark. Primero los que muestra el interfaz gráfico:

Mientras que por consola muestra cómo cada uno de los workers dentro del batch ha tenido coincidencias y las ha escrito en la base de datos de Cassandra. (Keyspace “datawarehouse”, tabla “spark”).
16/01/23 00:32:30 INFO Table Writer: Wrote 0 rows to datawarehouse.spark in 0,000 s.
16/01/23 00:32:30 INFO TableWriter: Wrote 0 rows to datawarehouse.spark in 0,012 s.
16/01/23 00:32:30 INFO TableWriter: Wrote 1 rows to datawarehouse.spark in 0,007 s.
16/01/23 00:32:30 INFO TableWriter: Wrote 2 rows to datawarehouse.spark in 0,009 s.
16/01/23 00:32:35 INFO TableWriter: Wrote 0 rows to datawarehouse.spark in 0,000 s.
16/01/23 00:32:35 INFO TableWriter: Wrote 0 rows to datawarehouse.spark in 0,010 s.
16/01/23 00:32:35 INFO TableWriter: Wrote 0 rows to datawarehouse.spark in 0,000 s.
16/01/23 00:32:35 INFO TableWriter: Wrote 1 rows to datawarehouse.spark in 0,005 s.

Tanto Apache Kafka como Apache Spark son aplicaciones Stateless (sin control de estado), tolerante a fallos y la no pérdida de información pero es necesario que exista un tercero capaz de orquestar el funcionamiento de forma eficiente. Para ello utilizamos ZooKeeper.

Apache ZooKeeper es una aplicación que provee un servicio de configuración centralizada donde el resto de aplicaciones se conectan para conocer su estado y modo de operación. En este caso Spark Streaming consulta a ZooKeeper su disposición de consumir los flujos de datos del tópico “cámaras”.  ZooKeeper no tiene un Output que mostrar.
Una vez que Spark consume los flujos de datos y filtra por los terminados en “97BBB” almacena los resultados en una base de datos Cassandra.

Apache Cassandra es una base de datos NoSQL distribuida basada en información clave-valor.
La salida que podemos ver en Cassandra es la propia información que se ha registrado correctamente:
Ya en el entorno de Windows tenemos instalado el programa de visualización Tableau Desktop 9.2, Tableau provee una fuerte capa de análisis generando Dashboard interactivos.
Sin embargo, de forma inicial no es posible cargar datos de Cassandra en Tableau. Se ha implementado un Driver ODBC Cassandra específico para Windows desarrollado por Simba Technologies.

Por tanto Tableau se conecta al DataWarehouse de MariaDB y a Cassandra por ODBC, mostrando todos los datos cruzados en un Dashboard con mapas provistos por MapBox, proveedor de mapas compatible con mayor nivel de detalle y mapas personalizados.
En el Dashboard se visualiza en la parte principal un mapa de la península con todos los registros detectados, en los que recibimos mayor nivel de detalle al pasar el ratón por encima. En la parte inferior se ve un contador agrupando la cantidad de resultados segmentado por el tipo de alarma. A la derecha hay una representación similar.
En la parte superior derecha podemos seleccionar los datos por fecha con unos botones de avance automático. Por limitaciones del driver ODBC Cassandra la actualización de la fuente de datos todavía no es automática cada varios segundos por lo que se requiere actualizar la fuente de datos.
Los iconos en verde o en rojo varían según el estado del vehículo detectado: Si un registro coincide con que NO tiene seguro o el coche esta dado de BAJA se muestra en el mapa en rojo. En otro caso el icono se muestra en verde.  Además cada registro cuenta con información adicional del vehículo y los datos de su titular.

Una de las características que se ha provisto al Dashboard es poder traspasar la información a un sistema ajeno mediante peticiones HTTP API REST. En este caso hemos utilizado Elastix, distribución Linux de comunicaciones unificadas que permite enviar mensajes de texto, llamadas, faxes y mensajería instantánea. En nuestro caso se presentan dos opciones telefónicas que recibe la distribución y genera una llamada con una locución automática.




Leave a Reply