본문 바로가기

개인공부

[MQTT / Python / IoT] LCFS Queue 구현

지난 시간에는 라즈베리파이를 이용하여 MQTT 프로토콜을 python으로 구현해보았다. 

 

이번시간에는 MQTT 프로토콜 통신으로 받은 데이터를 LCFS Queue에 저장하는 기능을 구현해 보겠다.

추후 진행할 AoI(Age of Information) 이론을 기반으로 한 신선도 우선 전송 로직 구현을 위한 밑작업이다.


LCFS (Last Come First Serve) 란?

후입 선출의 개념, 우리가 익히 알고있는 stack 자료구조가 이에 해당한다.

 

하지만 우리가 구현하려는 LCFS 큐는 스택과는 달리

각 센서마다 할당된 버퍼가 있고, 같은 센서 정보에 대한 LCFS(후입선출)을 구현하는 것에 목표를 둔다.

 

가장 간단한 구현을 위해, 각 센서별 할당 버퍼 공간은 1개로 가정한다.


LCFS를 구현 할 환경

MQTT를 통한 통신 구조

※서버 실제 전송까지는 구현하지않음.


LCFS 구현

GW를 담당하는 라즈베리파이 환경에 직접 LCFS 코드를 구현한다.

구현의 편의성을 위해 지난 시간 구현한 MQTT 통신 코드에 추가로 구현하였다.

 

mqtt_subscriber.py

import paho.mqtt.client as mqtt
import time, json
import threading
from datetime import datetime

# 데이터 버퍼링 클래스
class LCFS:
    def __init__(self):
        self.data = {}

    # 센서 데이터 추가
    def add_data(self, sensor_data):
        device_id = sensor_data['device_id']
        self.data[device_id] = sensor_data
        print(f"updated data from {device_id}")

    # 주기적으로 데이터 전송 (30초마다) (서버에 전송은 미구현)
    def send_data(self):
        while True:
            time.sleep(30)

            if not self.data:
                continue

            temp = self.data.copy()
            self.data.clear()

            for device_id, data in temp.items():
                sent_dt = datetime.fromtimestamp(data['timestamp'])
                # AoI(Age of Information을 기록, 추후에는 AoI에 따른 위상 변경(phase shift) 구현 예정)
                aoi = (datetime.now() - sent_dt).total_seconds()
                print(f"{device_id} data sent to server after updated {aoi:.1f} sec")

lcfs = LCFS()
buffer_lock = threading.Lock()

# MQTT 브로커 연결 콜백
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("connected succcessfully")
    else:
        print(f"connection failed with code {rc}")
    client.subscribe("aoi/data")

# 메시지 수신 콜백
def on_message(client, userdata, msg):
    try:
        payload = json.loads(msg.payload.decode()) 
        with buffer_lock:
            lcfs.add_data(payload)
    except Exception as e:
        print("payload parse error:",e)

mqttc = mqtt.Client()
mqttc.on_connect = on_connect
mqttc.on_message = on_message

# 브로커 연결 및 데이터 전송 스레드 시작
mqttc.connect()
sender_thread = threading.Thread(
    target= lcfs.send_data,
    daemon=True
    )
sender_thread.start()

mqttc.loop_forever()

 

Publisher에 대한 코드는 변경사항이 없으므로 따로 작성하지 않겠다.

 

추후 구현 확장성을 위해, 센서마다 서버에 데이터를 전송할 때 AoI를 기록하도록 구현하였다.

 


실행 결과

s1과 s2에서 온 데이터를 갱신하고 전송한다

 

다음 시간에는 이번에 기록한 AoI (정보의 신선도) 데이터를 바탕으로 하여

버퍼 갱신 알고리즘 구현을 준비해보겠다.