본문 바로가기

개인공부

[MQTT / Python / IoT] AoI를 고려한 큐 버퍼 관리 알고리즘 개발

지난시간에는 실제 센서데이터 확보를 위한 작업을 진행하였다.

이제 우리의 원래 목적으로 돌아가서, 이 프로젝트를 마무리 지을 시간이 왔다.

 

LCFS큐 구조에서 AoI (정보의 신선도)를 고려하여

게이트웨이 버퍼의 데이터 중 서버로 전송할 데이터를 선출하는 알고리즘을 개발해보자.


1. 제약 조건 정의

  • 4개의 센서 노드와 1개의 게이트웨이 노드. 1개의 서버 DB로 구성 
  • Sensor -> GW 데이터는 무조건 append
  • GW -> Server 전송 시의 버퍼 관리만 고려
  • 우선순위는 알람 메시지가 항상 더 높음
  • 이상치는 모두 노이즈가 아닌 실제 알람 값으로 고려
  • 각 센서가 발행하는 데이터와 주기는 아래 그림과 같음

센서 발행 데이터와 주기

 

센서 데이터 전송 환경에서 단순 AoI만 고려하게 된다면, 당연히 슬롯 1개를 할당하는 LCFS구조가 가장 좋지만

여러 비용을 함께 고려함으로써, 더 다양한 알고리즘을 테스트 해보기로 하였다.

2. 목적 함수 정의

목적함수 및 변수 정의

∆ : 센서 데이터 val의 변화량 (직전 받은 데이터 val - 현재 받은 데이터 val)
S_diff : 건너 뛴 센서 데이터의 시퀀스 번호 (갱신 효율을 계산하기 위함)
AoI (Age of Information): 정보의 신선도 (현재 시각 - 데이터 생성 시각)
IAT (Inter Arrive Time) : 서버가 데이터를 수신하는 주기 (즉시 전송 알고리즘 고려)
Q_len :  서버로 전송된 큐의 길이 (payload 부하 고려)

 

상술했듯이 다양한 비용을 고려한 알고리즘 도출을 위해 5개의 변수를 설정하고,

앞의 두 변수는 클수록, 뒤의 세 변수는 작을수록 좋은 점수를 받도록 목적함수를 정의하였다.

 

이를 통해 각 비용에 최적화된 알고리즘들을 고안해 볼 수 있게 되었다.


3. 다양한 버퍼 관리 알고리즘 정의

번호 알고리즘명 내용
1 LCFS 기본 LCFS 버퍼 (우선순위 고려 x)
2 FIFO 기본 FIFO
3 PRIORITY_V1 LCFS + 우선순위, 센서별 전송 슬롯 1칸 할당
4 PRIORITY_V2 LCFS + 우선순위, 센서별 전송 슬롯 2칸 할당
(센서 슬롯 1+ 알람 슬롯 1)
5 IMMEDIATE_PRIORITY_V1 알람 수신 즉시 (1초 안에) 전송 + PRIORITY_V1
6 IMMEDIATE_PRIORITY_V2 알람 수신 즉시 (1초 안에) 전송 + PRIORITY_V2

 

위와 같이 다양한 알고리즘을 고안하였고, 아래와 같은 코드로 구현하였다.

 

4. 알고리즘 코드 구현

class LCFS:
    def __init__(self):
        self.data = deque(maxlen=20)
        self.server_data = {}
        self.SEQ = 0
        self.alarm_event = threading.Event()

    def add_data(self, sensor_data):
        device_id = sensor_data['device_id']
        message_type = sensor_data.get('message_type', 'sensor')
        
        self.data.append(sensor_data)
        
        # [IMMEDIATE 모드] 5, 6번 알고리즘: 알람 시 즉시 전송 트리거
        if 'IMMEDIATE' in ALGORITHM and sensor_data.get('message_type') == 'alarm':
            print(f"{device_id} 알람 발생 즉시 전송")
            self.alarm_event.set()
        else:
            print(f"[{device_id}] data: {sensor_data['data']}")
    def select_sensor_data(self, method):
        selected_data = {}
        # 최신순으로 순회하기 위해 역순 리스트 생성
        reversed_data = list(self.data)[::-1]

        # [1] LCFS: 센서별 최신 1개
        if method == 'LCFS':
            for data in reversed_data:
                did = data['device_id']
                if did not in selected_data:
                    selected_data[did] = data
        
        # [2] FIFO: 센서별 가장 오래된 1개 (정방향 순회)
        elif method == 'FIFO':
            for data in self.data:
                did = data['device_id']
                if did not in selected_data:
                    selected_data[did] = data

        # [3] & [5] PRIORITY_V1 계열 (센서별 슬롯 1개: 알람 우선)
        # - 알람이 있으면 최신 알람 1개 선택 (일반 데이터 무시/덮어쓰기)
        # - 알람이 없으면 최신 일반 데이터 1개 선택
        elif method in ['PRIORITY_V1', 'IMMEDIATE_PRIORITY_V1']:
            for data in reversed_data:
                did = data['device_id']
                m_type = data.get('message_type', 'sensor')
                
                if did not in selected_data:
                    # 빈 슬롯이면 무조건 채움 (일단 최신 데이터 확보)
                    selected_data[did] = data
                else:
                    # 이미 데이터가 있는데, 기존꺼는 '일반'이고 지금 본 게 '알람'이면 교체
                    # (알람 우선순위 적용)
                    if selected_data[did].get('message_type') != 'alarm' and m_type == 'alarm':
                        selected_data[did] = data
                        
        # [4] & [6] PRIORITY_V2 계열 (센서별 슬롯 2개: 알람1 + 데이터1)
        # - 센서별로 최신 알람 1개, 최신 일반 데이터 1개 각각 확보하여 전송
        elif method in ['PRIORITY_V2', 'IMMEDIATE_PRIORITY_V2']:
            seen_normal = set()
            seen_alarm = set()
            
            for data in reversed_data:
                did = data['device_id']
                m_type = data.get('message_type', 'sensor')

                if m_type == 'alarm':
                    if did not in seen_alarm:
                        # 알람은 별도 키로 저장하여 일반 데이터와 공존 가능하게 함
                        unique_key = f"{did}_alarm_latest"
                        selected_data[unique_key] = data
                        seen_alarm.add(did)
                else:
                    if did not in seen_normal:
                        selected_data[did] = data
                        seen_normal.add(did)
        
        else:
            # 예외: 기본 LCFS 동작
            return self.select_sensor_data('LCFS')

        return selected_data

    def send_data(self):
        while True:
            is_event_triggered = self.alarm_event.wait(timeout=10)
            
            if is_event_triggered and 'IMMEDIATE' in ALGORITHM:
                # 알람 때문에 깨어난 경우, 연속된 알람을 모으기 위해 잠시 대기
                time.sleep(1.0) 
            
            # 대기 후 플래그 초기화
            self.alarm_event.clear()

            if not self.data:
                continue

            # 선택 알고리즘 실행
            new_sensor_data_map = self.select_sensor_data(method=ALGORITHM)
            
            if not new_sensor_data_map:
                continue
            
            # DB 저장 Payload 생성
            gateway_id = "gw1"
            timestamp = time.time()
            seq_number = self.SEQ
            q_length = len(new_sensor_data_map)
            payload = json.dumps(new_sensor_data_map)

            self.SEQ += 1

            try:
                with conn.cursor() as cursor:
                    sql = "INSERT INTO sensor_logs (gateway_id, raw_timestamp, seq_number, q_length, payload) VALUES (%s, %s, %s, %s, %s)"
                    cursor.execute(sql, (gateway_id, timestamp, seq_number, q_length, payload))
                conn.commit()

                print(f"DB 저장 성공: {cursor.rowcount} rows affected")
            except Exception as e:
                print(f"DB Error: {e}")

 

이제 코드까지 구현하였으니, 실제 성능 측정을 진행하도록 하자.


5. 성능 측정 시나리오 정의

 

먼저 성능 측정을 위한 시나리오를 정의하였다.

시나리오 공통

  • 직전 5분 간 전체 센서 score의 평균을 측정
  • 패킷 로스는 발생하지 않는 환경으로 가정

시나리오 1

  • 알람 데이터 센싱 환경 조성을 위한 센서별 조작 2회
  • 센서 조작 주기 : 1분, 3분(15초간)
  • GW Buffer 크기 : 30 (제한적 버퍼)

시나리오 2

  • 알람 데이터 센싱 환경 조성을 위한 센서별 조작 4회
  • 센서 조작 주기 : 1분, 2분, 3분, 4분(15초간)
  • GW Buffer 크기 : 30 (제한적 버퍼)

시나리오 3

  • 알람 데이터 센싱 환경 조성을 위한 센서별 조작 4회
  • 센서 조작 주기 : 1분, 2분, 3분, 4분(15초간)
  • GW Buffer 크기 : 60 (여유로운 버퍼)

 

이후 시나리오 별 테스트 과정 및 결과를 쉽게 확인할 수 있도록, 대시보드를 구현하였다.

 

6. 시각화 대시보드 구현

시각화 대시보드 페이지

화면 구성

  • 센서별 정보 섹션
    • 장치 명
    • Efficiency(정규화 된 RawScore)
    • Q(Q_length)
    • IAT(Inter-Arrive-Time)
    • Seq(현재 SEQ번호)
      • 이전 SEQ와의 차
    • Delay(신선도)
    • 차트 영역
      • 센서 값 그래프
      • RawScore 그래프
  • 알람 섹션
    • 5분간 전체 시스템 평균 RawScore
    • 알람 로그 영역

 

7. 성능 측정

 

7. 성능 측정 결과 비교 분석

번호 알고리즘명 시나리오 1 시나리오 2 시나리오 3 비고
1 LCFS 5.5 6.7 6.4 우선순위 고려 x (∆↓)
2 FIFO 1.8 3.2 1.6 알람 미발생시 0점
3 PRIORITY_V1 6.4 6.4 5.5  
4 PRIORITY_V2 6.2 6.3 5.2 V1대비  Q_len↑
5 IMMEDIATE_PRIORITY_V1 6.1 7.3 6.5 일반 PRIORITY 대비 s_diff↓
6 IMMEDIATE_PRIORITY_V2 6.4 5.6 4.9 V1대비  Q_len↑

 

LCFS

  • 알람 데이터의 우선순위를 무시하고 갱신하기 때문에 알람 타이밍에 따른 점수 변동이 매우 큼 (∆↓) 
  • 알람 발생 횟수가 많을수록 알람 데이터가 갱신될 가능성이 커 높은 점수를 받음

FIFO

  • 전체적인 성능지표에서 전부 낮게 측정
  • 알람 미발생 환경에서는 0점을 기록

PRIORITY_V1 & V2

  • 다른 지표는 거의 동일하나, Q_len의 차이로 인한 성능 차이를 보임 (V1대비 V2 Q_len)

IMMEDIATE_PRIORITY_V1 & V2

  • 알람 발생 시의 RawScore 점수가 매우 높으나, 알람 빈도에 따른 점수 변동이 큼
  • 특히 알람 발생이 잦은 환경에서, V2는 매번 센서당 슬롯을 2개씩 할당하여 보내기 때문에 S_diff 변수에 대한 점수를 기대하기 어려움

 


8. 최종 결론

  • 제안된 목적 함수 (AoI, ∆,S_diff,Q_len) 및 가중치를 기준으로 평가할 때,  IMMEDIATE_PRIORITY_V1 알고리즘이 가장 효율적임.
  • 알람의 즉시 전송을 통해 정보의 가치를 극대화하면서도, 단일 슬롯 구조로 시스템 부하 (Q_len) 를 최소화하여 성능과 효율성의 균형(Trade-off) 을 가장 잘 유지함.

9. 한계

  • 센서 데이터 전송 주기와 GW 전송 주기에 대한 Sync 고려 x
  • 이상치 데이터에 대한 노이즈 구별 x
  • Khadas VIM4의 특성(고성능 CPU)과 맞지 않는 라즈베리파이 센서 장치 사용으로 인한 센싱 안정성 문제
  • 고정된 가중치를 사용한 테스트만 진행하여 여러 상황 별 특화된 알고리즘 도출 불가

10. 향후 과제

  • 센서 데이터 전송 주기와 GW 전송 주기에 대한 Sync 를 고려한 phase shift 구현
  • 이상치 데이터에 대한 노이즈 구별을 위한 데이터 모니터링 시스템 추가
  • 기기간 호환성을 고려한 통신 환경 구축
  • 가중치 설정을 다양화하여 각 상황별 최적의 알고리즘 도출

 

이번 AoI 프로젝트를 진행하며 IoT 네트워크 기술과 간단한 임베디드 시스템 구현을 경험하였다.

 

프로젝트 진행 과정 속에서 다양한 도구와 장치를 접하며 많은 난관이 있었지만,

앞으로의 개발에 있어 많은 도움이 되는 경험이었다.

 

다음에는 원래 진행하던 NS-3 개인 공부를 다시 이어나가도록 하겠다.