지난 시간에는 라즈베리파이를 이용하여 MQTT 프로토콜을 python으로 구현해보았다.
이번시간에는 MQTT 프로토콜 통신으로 받은 데이터를 LCFS Queue에 저장하는 기능을 구현해 보겠다.
추후 진행할 AoI(Age of Information) 이론을 기반으로 한 신선도 우선 전송 로직 구현을 위한 밑작업이다.
LCFS (Last Come First Serve) 란?
후입 선출의 개념, 우리가 익히 알고있는 stack 자료구조가 이에 해당한다.
하지만 우리가 구현하려는 LCFS 큐는 스택과는 달리
각 센서마다 할당된 버퍼가 있고, 같은 센서 정보에 대한 LCFS(후입선출)을 구현하는 것에 목표를 둔다.
가장 간단한 구현을 위해, 각 센서별 할당 버퍼 공간은 1개로 가정한다.
LCFS를 구현 할 환경

※서버 실제 전송까지는 구현하지않음.
LCFS 구현
GW를 담당하는 라즈베리파이 환경에 직접 LCFS 코드를 구현한다.
구현의 편의성을 위해 지난 시간 구현한 MQTT 통신 코드에 추가로 구현하였다.
mqtt_subscriber.py
import paho.mqtt.client as mqtt
import time, json
import threading
from datetime import datetime
# 데이터 버퍼링 클래스
class LCFS:
def __init__(self):
self.data = {}
# 센서 데이터 추가
def add_data(self, sensor_data):
device_id = sensor_data['device_id']
self.data[device_id] = sensor_data
print(f"updated data from {device_id}")
# 주기적으로 데이터 전송 (30초마다) (서버에 전송은 미구현)
def send_data(self):
while True:
time.sleep(30)
if not self.data:
continue
temp = self.data.copy()
self.data.clear()
for device_id, data in temp.items():
sent_dt = datetime.fromtimestamp(data['timestamp'])
# AoI(Age of Information을 기록, 추후에는 AoI에 따른 위상 변경(phase shift) 구현 예정)
aoi = (datetime.now() - sent_dt).total_seconds()
print(f"{device_id} data sent to server after updated {aoi:.1f} sec")
lcfs = LCFS()
buffer_lock = threading.Lock()
# MQTT 브로커 연결 콜백
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("connected succcessfully")
else:
print(f"connection failed with code {rc}")
client.subscribe("aoi/data")
# 메시지 수신 콜백
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
with buffer_lock:
lcfs.add_data(payload)
except Exception as e:
print("payload parse error:",e)
mqttc = mqtt.Client()
mqttc.on_connect = on_connect
mqttc.on_message = on_message
# 브로커 연결 및 데이터 전송 스레드 시작
mqttc.connect()
sender_thread = threading.Thread(
target= lcfs.send_data,
daemon=True
)
sender_thread.start()
mqttc.loop_forever()
Publisher에 대한 코드는 변경사항이 없으므로 따로 작성하지 않겠다.
추후 구현 확장성을 위해, 센서마다 서버에 데이터를 전송할 때 AoI를 기록하도록 구현하였다.
실행 결과

다음 시간에는 이번에 기록한 AoI (정보의 신선도) 데이터를 바탕으로 하여
버퍼 갱신 알고리즘 구현을 준비해보겠다.
'개인공부' 카테고리의 다른 글
| [Linux / SSH / IoT] Ansible을 활용한 환경 구성 자동화 (0) | 2026.01.05 |
|---|---|
| [MQTT / C++ / Python / IoT] Khadas VIM4 환경 온습도 센서 연결 (0) | 2025.12.31 |
| [MQTT / Python / IoT] Python으로 구현하는 MQTT (0) | 2025.12.01 |
| [NS-3] 2. second.cc 해체 분석 (0) | 2025.11.24 |
| [NS-3] α. NS-3 디버깅 기능 파헤치기 (0) | 2025.11.21 |