python—socket服务器接收以及数据存储

  • ~43.63K 字
  1. 1. 项目结构
  2. 2. 1. config.yml
  3. 3. 2. config_loader.py
  4. 4. 3. db_handler.py
  5. 5. 4. cache_manager.py
  6. 6. 5. dtu_server.py
  7. 7. 6. web_interface.py
  8. 8. 7. main.py
  9. 9. 8. Web界面模板 (templates/index.html)
  10. 10. 9. 其他模板文件
  • 系统功能总结
    1. 1. 1. MySQL数据存储
    2. 2. 2. 配置文件管理
    3. 3. 3. Web界面
    4. 4. 4. 模块化设计
    5. 5. 5. 缓存机制
  • 部署说明
  • 扩展建议

  • 下面我将按照您的要求对程序进行扩充,包括配置文件管理、MySQL存储、Web界面和缓存机制。由于代码量较大,我会分模块实现,并确保每个功能清晰可维护。

    项目结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    dtu_server/
    ├── config.yml # 配置文件
    ├── main.py # 主程序入口
    ├── dtu_server.py # DTU服务器核心
    ├── db_handler.py # 数据库处理模块
    ├── web_interface.py # Web界面模块
    ├── cache_manager.py # 缓存管理模块
    ├── config_loader.py # 配置加载模块
    └── templates/ # Web界面模板
    ├── index.html
    ├── devices.html
    ├── data.html
    └── connections.html

    1. config.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    # 服务器配置
    server:
    host: "0.0.0.0"
    main_port: 22334
    web_port: 8080
    heartbeat_timeout: 60
    max_missed: 3

    # MySQL数据库配置
    mysql:
    host: "192.168.1.100"
    port: 3306
    user: "abc"
    password: "abc@123"
    database: "dtu_data"
    pool_size: 5

    # 缓存配置
    cache:
    max_size: 1000
    flush_interval: 10

    # 查询任务配置
    query_tasks:
    1:
    start_addr: 2000
    length: 100
    interval: 30.0
    trigger: "timer"
    2:
    start_addr: 2200
    length: 15
    interval: 60.0
    trigger: "timer"
    3:
    start_addr: 2300
    length: 10
    interval: 0.0
    trigger: "heartbeat"
    4:
    start_addr: 2400
    length: 20
    interval: 120.0
    trigger: "timer"

    # 数据点配置
    data_points:
    2000:
    name: "temperature"
    type: "int"
    length: 2
    scale: 0.1
    2010:
    name: "humidity"
    type: "float"
    length: 4
    scale: null
    2020:
    name: "pressure"
    type: "int"
    length: 2
    scale: 0.01
    2030:
    name: "flow_rate"
    type: "float"
    length: 4
    scale: null
    2040:
    name: "status"
    type: "int"
    length: 2
    scale: 1
    2200:
    name: "voltage"
    type: "int"
    length: 2
    scale: 0.1
    2202:
    name: "current"
    type: "int"
    length: 2
    scale: 0.01
    2300:
    name: "rpm"
    type: "int"
    length: 2
    scale: 1
    2400:
    name: "frequency"
    type: "int"
    length: 2
    scale: 0.1

    2. config_loader.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    import yaml
    import os

    class ConfigLoader:
    def __init__(self, config_path="config.yml"):
    self.config_path = config_path
    self.config = self._load_config()

    def _load_config(self):
    """加载YAML配置文件"""
    if not os.path.exists(self.config_path):
    raise FileNotFoundError(f"配置文件 {self.config_path} 不存在")

    with open(self.config_path, 'r') as f:
    return yaml.safe_load(f)

    def get_server_config(self):
    """获取服务器配置"""
    return self.config.get('server', {})

    def get_mysql_config(self):
    """获取MySQL配置"""
    return self.config.get('mysql', {})

    def get_cache_config(self):
    """获取缓存配置"""
    return self.config.get('cache', {})

    def get_query_tasks(self):
    """获取查询任务配置"""
    tasks = self.config.get('query_tasks', {})
    # 转换为有序字典,保持配置顺序
    return {int(k): v for k, v in tasks.items()}

    def get_data_points(self):
    """获取数据点配置"""
    return self.config.get('data_points', {})

    def get_web_port(self):
    """获取Web端口"""
    return self.config.get('server', {}).get('web_port', 8080)

    3. db_handler.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    342
    343
    import mysql.connector
    from mysql.connector import Error
    from mysql.connector.pooling import MySQLConnectionPool
    import logging

    class DBServer:
    """MySQL数据库管理类"""
    def __init__(self, config):
    self.config = config
    self.connection_pool = None
    self._initialize_pool()
    self._create_tables()

    def _initialize_pool(self):
    """初始化数据库连接池"""
    try:
    # 从配置中提取参数
    pool_config = {
    'host': self.config.get('host'),
    'port': self.config.get('port', 3306),
    'user': self.config.get('user'),
    'password': self.config.get('password'),
    'database': self.config.get('database'),
    'pool_name': 'dtu_pool',
    'pool_size': self.config.get('pool_size', 5),
    'autocommit': True
    }

    self.connection_pool = MySQLConnectionPool(**pool_config)
    logging.info("MySQL连接池初始化成功")
    except Error as e:
    logging.error(f"创建MySQL连接池失败: {str(e)}")
    self.connection_pool = None

    def _create_tables(self):
    """创建必要的数据库表"""
    if not self.connection_pool:
    return

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor()

    # 创建设备信息表
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS devices (
    id INT AUTO_INCREMENT PRIMARY KEY,
    device_id INT NOT NULL,
    ip_address VARCHAR(20) NOT NULL,
    port INT NOT NULL,
    first_seen DATETIME NOT NULL,
    last_seen DATETIME NOT NULL,
    status ENUM('online', 'offline') DEFAULT 'online',
    UNIQUE KEY unique_device (device_id)
    )
    """)

    # 创建设备数据表
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS device_data (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    device_id INT NOT NULL,
    data_point VARCHAR(50) NOT NULL,
    value FLOAT NOT NULL,
    raw_value BIGINT,
    timestamp DATETIME NOT NULL,
    INDEX idx_device (device_id),
    INDEX idx_timestamp (timestamp)
    )
    """)

    # 创建连接历史表
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS connection_history (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    device_id INT NOT NULL,
    event ENUM('connect', 'disconnect', 'heartbeat_timeout') NOT NULL,
    timestamp DATETIME NOT NULL,
    details VARCHAR(255)
    )
    """)

    # 创建心跳记录表
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS heartbeat_log (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    device_id INT NOT NULL,
    timestamp DATETIME NOT NULL,
    INDEX idx_device (device_id),
    INDEX idx_timestamp (timestamp)
    )
    """)

    # 创建原始数据包表
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS raw_packets (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    device_id INT NOT NULL,
    packet_type ENUM('register', 'heartbeat', 'data', 'modbus') NOT NULL,
    raw_data BLOB,
    timestamp DATETIME NOT NULL,
    INDEX idx_device (device_id),
    INDEX idx_timestamp (timestamp)
    )
    """)

    conn.commit()
    logging.info("数据库表创建/验证成功")
    except Error as e:
    logging.error(f"创建数据库表失败: {str(e)}")
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    def log_device_connection(self, device_id, ip, port, event, details=None):
    """记录设备连接事件"""
    if not self.connection_pool:
    return

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor()

    # 记录连接历史
    cursor.execute("""
    INSERT INTO connection_history
    (device_id, event, timestamp, details)
    VALUES (%s, %s, %s, %s)
    """, (device_id, event, datetime.now(), details))

    # 更新设备状态
    if event == 'connect':
    cursor.execute("""
    INSERT INTO devices
    (device_id, ip_address, port, first_seen, last_seen, status)
    VALUES (%s, %s, %s, %s, %s, 'online')
    ON DUPLICATE KEY UPDATE
    ip_address = VALUES(ip_address),
    port = VALUES(port),
    last_seen = VALUES(last_seen),
    status = 'online'
    """, (device_id, ip, port, datetime.now(), datetime.now()))
    elif event in ('disconnect', 'heartbeat_timeout'):
    cursor.execute("""
    UPDATE devices
    SET status = 'offline'
    WHERE device_id = %s
    """, (device_id,))

    conn.commit()
    except Error as e:
    logging.error(f"记录设备连接事件失败: {str(e)}")
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    def log_heartbeat(self, device_id):
    """记录设备心跳"""
    if not self.connection_pool:
    return

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor()

    # 记录心跳
    cursor.execute("""
    INSERT INTO heartbeat_log
    (device_id, timestamp)
    VALUES (%s, %s)
    """, (device_id, datetime.now()))

    # 更新最后在线时间
    cursor.execute("""
    UPDATE devices
    SET last_seen = %s
    WHERE device_id = %s
    """, (datetime.now(), device_id))

    conn.commit()
    except Error as e:
    logging.error(f"记录心跳失败: {str(e)}")
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    def save_data_point(self, device_id, data_point, value, raw_value):
    """保存数据点到数据库"""
    if not self.connection_pool:
    return False

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor()

    cursor.execute("""
    INSERT INTO device_data
    (device_id, data_point, value, raw_value, timestamp)
    VALUES (%s, %s, %s, %s, %s)
    """, (device_id, data_point, value, raw_value, datetime.now()))

    conn.commit()
    return True
    except Error as e:
    logging.error(f"保存数据点失败: {str(e)}")
    return False
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    def save_raw_packet(self, device_id, packet_type, raw_data):
    """保存原始数据包到数据库"""
    if not self.connection_pool:
    return False

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor()

    cursor.execute("""
    INSERT INTO raw_packets
    (device_id, packet_type, raw_data, timestamp)
    VALUES (%s, %s, %s, %s)
    """, (device_id, packet_type, raw_data, datetime.now()))

    conn.commit()
    return True
    except Error as e:
    logging.error(f"保存原始数据包失败: {str(e)}")
    return False
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    def get_connected_devices(self):
    """获取在线设备列表"""
    if not self.connection_pool:
    return []

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
    SELECT * FROM devices
    WHERE status = 'online'
    ORDER BY last_seen DESC
    """)

    return cursor.fetchall()
    except Error as e:
    logging.error(f"获取在线设备失败: {str(e)}")
    return []
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    def get_device_data(self, device_id, limit=100):
    """获取设备数据"""
    if not self.connection_pool:
    return []

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
    SELECT * FROM device_data
    WHERE device_id = %s
    ORDER BY timestamp DESC
    LIMIT %s
    """, (device_id, limit))

    return cursor.fetchall()
    except Error as e:
    logging.error(f"获取设备数据失败: {str(e)}")
    return []
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    def get_connection_history(self, limit=100):
    """获取连接历史"""
    if not self.connection_pool:
    return []

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
    SELECT * FROM connection_history
    ORDER BY timestamp DESC
    LIMIT %s
    """, (limit,))

    return cursor.fetchall()
    except Error as e:
    logging.error(f"获取连接历史失败: {str(e)}")
    return []
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    def get_raw_packets(self, device_id=None, limit=100):
    """获取原始数据包"""
    if not self.connection_pool:
    return []

    try:
    conn = self.connection_pool.get_connection()
    cursor = conn.cursor(dictionary=True)

    if device_id:
    cursor.execute("""
    SELECT * FROM raw_packets
    WHERE device_id = %s
    ORDER BY timestamp DESC
    LIMIT %s
    """, (device_id, limit))
    else:
    cursor.execute("""
    SELECT * FROM raw_packets
    ORDER BY timestamp DESC
    LIMIT %s
    """, (limit,))

    return cursor.fetchall()
    except Error as e:
    logging.error(f"获取原始数据包失败: {str(e)}")
    return []
    finally:
    if conn.is_connected():
    cursor.close()
    conn.close()

    4. cache_manager.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    import threading
    import time
    import logging
    from collections import deque
    from datetime import datetime

    class DataCache:
    """数据缓存管理类"""
    def __init__(self, max_size=1000, flush_interval=10, db_handler=None):
    self.max_size = max_size
    self.flush_interval = flush_interval
    self.db_handler = db_handler
    self.data_points = deque(maxlen=max_size)
    self.raw_packets = deque(maxlen=max_size)
    self.running = False
    self.lock = threading.Lock()

    def start(self):
    """启动缓存刷新线程"""
    self.running = True
    flush_thread = threading.Thread(target=self._flush_thread)
    flush_thread.daemon = True
    flush_thread.start()
    logging.info("数据缓存刷新线程已启动")

    def stop(self):
    """停止缓存刷新线程"""
    self.running = False
    self.flush_all() # 停止前刷新所有数据
    logging.info("数据缓存刷新线程已停止")

    def add_data_point(self, device_id, data_point, value, raw_value):
    """添加数据点到缓存"""
    with self.lock:
    self.data_points.append({
    'device_id': device_id,
    'data_point': data_point,
    'value': value,
    'raw_value': raw_value,
    'timestamp': datetime.now()
    })

    def add_raw_packet(self, device_id, packet_type, raw_data):
    """添加原始数据包到缓存"""
    with self.lock:
    self.raw_packets.append({
    'device_id': device_id,
    'packet_type': packet_type,
    'raw_data': raw_data,
    'timestamp': datetime.now()
    })

    def flush_all(self):
    """立即刷新所有缓存数据到数据库"""
    with self.lock:
    # 刷新数据点
    while self.data_points:
    item = self.data_points.popleft()
    if self.db_handler:
    self.db_handler.save_data_point(
    item['device_id'],
    item['data_point'],
    item['value'],
    item['raw_value']
    )

    # 刷新原始数据包
    while self.raw_packets:
    item = self.raw_packets.popleft()
    if self.db_handler:
    self.db_handler.save_raw_packet(
    item['device_id'],
    item['packet_type'],
    item['raw_data']
    )

    def _flush_thread(self):
    """定时刷新缓存数据到数据库"""
    while self.running:
    time.sleep(self.flush_interval)
    self.flush_all()
    logging.debug(f"缓存数据已刷新,剩余数据点: {len(self.data_points)},原始包: {len(self.raw_packets)}")

    5. dtu_server.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    import socket
    import threading
    import time
    import logging
    import struct
    from datetime import datetime
    from collections import OrderedDict
    from .db_handler import DBServer
    from .cache_manager import DataCache
    from .config_loader import ConfigLoader

    class DtuServer:
    def __init__(self, config_loader):
    self.config_loader = config_loader
    self.server_config = config_loader.get_server_config()
    self.query_tasks = config_loader.get_query_tasks()
    self.data_points = config_loader.get_data_points()

    self.host = self.server_config.get('host', '0.0.0.0')
    self.port = self.server_config.get('main_port', 22334)
    self.heartbeat_timeout = self.server_config.get('heartbeat_timeout', 60)
    self.max_missed = self.server_config.get('max_missed', 3)

    self.server_socket = None
    self.connections = {} # {device_id: {'socket': sock, 'last_heartbeat': timestamp, 'task_status': dict}}
    self.lock = threading.Lock()
    self.running = False

    # 初始化数据库连接
    mysql_config = config_loader.get_mysql_config()
    self.db = DBServer(mysql_config) if mysql_config else None

    # 初始化缓存
    cache_config = config_loader.get_cache_config()
    self.cache = DataCache(
    max_size=cache_config.get('max_size', 1000),
    flush_interval=cache_config.get('flush_interval', 10),
    db_handler=self.db
    )
    self.cache.start()

    def start(self):
    """启动DTU服务器"""
    self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.server_socket.bind((self.host, self.port))
    self.server_socket.listen(5)

    self.running = True
    logging.info(f"DTU服务器启动,监听 {self.host}:{self.port}")
    logging.info(f"配置了 {len(self.query_tasks)} 个查询任务")

    # 启动接受连接的线程
    accept_thread = threading.Thread(target=self._accept_connections)
    accept_thread.daemon = True
    accept_thread.start()

    # 启动心跳监控线程
    heartbeat_thread = threading.Thread(target=self._monitor_heartbeats)
    heartbeat_thread.daemon = True
    heartbeat_thread.start()

    # 启动定时查询任务线程
    query_thread = threading.Thread(target=self._execute_query_tasks)
    query_thread.daemon = True
    query_thread.start()

    # 其他方法保持不变,但使用self.query_tasks和self.data_points代替全局配置
    # 在_parse_data_point方法中,使用缓存代替直接数据库写入:

    def _parse_data_point(self, device_id, address, reg_data, offset):
    """解析数据点并保存到缓存"""
    if address not in self.data_points:
    return

    dp_config = self.data_points[address]
    name = dp_config['name']
    dtype = dp_config['type']
    length = dp_config['length']
    scale = dp_config.get('scale')

    raw_value = None
    value = None

    try:
    # 计算数据在响应中的位置 (每个寄存器2字节)
    start_index = offset * 2
    end_index = start_index + length

    if end_index > len(reg_data):
    logging.warning(f"设备 {device_id} 数据点 {name} 数据不足")
    return

    data_bytes = reg_data[start_index:end_index]
    raw_value = int.from_bytes(data_bytes, 'big')

    # 根据数据类型解析
    if dtype == "int":
    if length == 2: # 16位整数
    value = struct.unpack('>h', data_bytes)[0]
    elif length == 4: # 32位整数
    value = struct.unpack('>i', data_bytes)[0]
    else:
    logging.warning(f"设备 {device_id} 不支持的长度 {length} 的整数")
    return

    if scale is not None:
    value = value * scale

    elif dtype == "float":
    if length == 4: # 32位浮点数
    value = struct.unpack('>f', data_bytes)[0]
    else:
    logging.warning(f"设备 {device_id} 不支持的长度 {length} 的浮点数")
    return

    else:
    logging.warning(f"设备 {device_id} 未知数据类型: {dtype}")
    return

    # 记录解析结果
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_msg = f"{timestamp} 设备 {device_id} {name}: {value}"

    if scale is not None and dtype == "int":
    log_msg += f" (原始值: {value/scale})"

    logging.info(log_msg)

    # 保存到缓存
    self.cache.add_data_point(device_id, name, value, raw_value)

    except Exception as e:
    logging.error(f"解析设备 {device_id} 数据点 {name} 出错: {str(e)}")

    # 在_process_heartbeat方法中记录心跳到缓存
    def _process_heartbeat(self, device_id):
    """处理心跳包"""
    with self.lock:
    if device_id in self.connections:
    conn = self.connections[device_id]
    conn['last_heartbeat'] = time.time()
    conn['missed'] = 0
    logging.debug(f"收到心跳: 设备 {device_id}")

    # 记录心跳到缓存
    self.cache.add_raw_packet(device_id, 'heartbeat', b'')

    # 发送心跳响应
    try:
    conn['socket'].sendall(b'\x02') # 0x02 表示心跳响应
    except:
    self._remove_connection(device_id)

    # 触发心跳相关的查询任务
    self._trigger_heartbeat_queries(device_id)

    # 在_handle_client方法中记录注册包
    def _handle_client(self, client_socket, addr):
    """处理客户端连接"""
    device_id = None
    try:
    # 接收注册包 (假设前4字节是设备ID)
    reg_data = client_socket.recv(4)
    if len(reg_data) < 4:
    logging.warning(f"无效注册包: {addr[0]}:{addr[1]}")
    client_socket.close()
    return

    # 解析设备ID (4字节整数)
    device_id = struct.unpack('>I', reg_data)[0]
    logging.info(f"设备注册: ID={device_id} ({addr[0]}:{addr[1]})")

    # 记录注册包
    self.cache.add_raw_packet(device_id, 'register', reg_data)

    # 其他处理逻辑保持不变...

    # 在_process_data方法中记录数据包
    def _process_data(self, device_id, data):
    """处理普通数据包"""
    logging.info(f"来自设备 {device_id} 的数据: {data.hex()}")

    # 记录原始数据包
    self.cache.add_raw_packet(device_id, 'data', data)

    # 其他处理逻辑保持不变...

    # 在_process_modbus_response方法中记录Modbus响应
    def _process_modbus_response(self, device_id, data):
    """处理Modbus响应"""
    if device_id not in self.connections:
    return

    try:
    # 记录Modbus响应包
    self.cache.add_raw_packet(device_id, 'modbus', data)

    # 其他处理逻辑保持不变...

    # 在stop方法中停止缓存
    def stop(self):
    """停止服务器"""
    self.running = False
    self.cache.stop()
    # 其他停止逻辑保持不变...

    6. web_interface.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    from flask import Flask, render_template, request, jsonify
    from .db_handler import DBServer
    from .config_loader import ConfigLoader
    import logging

    app = Flask(__name__)

    # 全局变量
    db_handler = None

    @app.route('/')
    def index():
    """首页仪表盘"""
    return render_template('index.html')

    @app.route('/devices')
    def devices():
    """设备列表页面"""
    online_devices = db_handler.get_connected_devices() if db_handler else []
    return render_template('devices.html', devices=online_devices)

    @app.route('/device/<int:device_id>')
    def device_detail(device_id):
    """设备详情页面"""
    device_data = db_handler.get_device_data(device_id, 100) if db_handler else []
    raw_packets = db_handler.get_raw_packets(device_id, 50) if db_handler else []
    return render_template('device_detail.html',
    device_id=device_id,
    device_data=device_data,
    raw_packets=raw_packets)

    @app.route('/data')
    def data():
    """数据浏览页面"""
    device_id = request.args.get('device_id', type=int)
    limit = request.args.get('limit', 100, type=int)

    if device_id:
    data_points = db_handler.get_device_data(device_id, limit) if db_handler else []
    else:
    data_points = []

    return render_template('data.html', data_points=data_points, device_id=device_id)

    @app.route('/connections')
    def connections():
    """连接历史页面"""
    connections = db_handler.get_connection_history(100) if db_handler else []
    return render_template('connections.html', connections=connections)

    @app.route('/api/devices')
    def api_devices():
    """API: 获取设备列表"""
    devices = db_handler.get_connected_devices() if db_handler else []
    return jsonify(devices)

    @app.route('/api/device/<int:device_id>/data')
    def api_device_data(device_id):
    """API: 获取设备数据"""
    limit = request.args.get('limit', 100, type=int)
    data_points = db_handler.get_device_data(device_id, limit) if db_handler else []
    return jsonify(data_points)

    @app.route('/api/raw_packets')
    def api_raw_packets():
    """API: 获取原始数据包"""
    device_id = request.args.get('device_id', type=int)
    limit = request.args.get('limit', 50, type=int)
    raw_packets = db_handler.get_raw_packets(device_id, limit) if db_handler else []
    return jsonify(raw_packets)

    def start_web_interface(config_loader):
    """启动Web界面"""
    global db_handler

    # 初始化数据库连接
    mysql_config = config_loader.get_mysql_config()
    if mysql_config:
    db_handler = DBServer(mysql_config)

    web_port = config_loader.get_web_port()
    logging.info(f"启动Web界面,端口: {web_port}")

    # 在生产环境中应使用生产服务器如Gunicorn
    app.run(host='0.0.0.0', port=web_port, threaded=True)

    7. main.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    import threading
    import logging
    from config_loader import ConfigLoader
    from dtu_server import DtuServer
    from web_interface import start_web_interface

    # 配置日志
    logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
    logging.FileHandler("dtu_server.log"),
    logging.StreamHandler()
    ]
    )

    def main():
    # 加载配置
    config_loader = ConfigLoader("config.yml")

    # 启动DTU服务器
    dtu_server = DtuServer(config_loader)
    dtu_server.start()

    # 启动Web界面(在新线程中)
    web_thread = threading.Thread(target=start_web_interface, args=(config_loader,))
    web_thread.daemon = True
    web_thread.start()

    try:
    # 打印配置信息
    logging.info("服务器配置:")
    logging.info(f" 主机: {dtu_server.host}")
    logging.info(f" 主端口: {dtu_server.port}")
    logging.info(f" Web端口: {config_loader.get_web_port()}")
    logging.info(f" 心跳超时: {dtu_server.heartbeat_timeout}秒")
    logging.info(f" 最大丢失心跳: {dtu_server.max_missed}次")

    # 打印查询任务信息
    logging.info("查询任务配置:")
    for task_id, task_info in dtu_server.query_tasks.items():
    logging.info(f" 任务 {task_id}: 地址 {task_info['start_addr']}-{task_info['start_addr']+task_info['length']-1}, "
    f"长度 {task_info['length']} 寄存器, "
    f"间隔 {task_info['interval'] if task_info['interval'] > 0 else '心跳触发'}秒, "
    f"触发方式: {task_info['trigger']}")

    # 打印数据点信息
    logging.info("数据点配置:")
    for addr, dp_info in dtu_server.data_points.items():
    logging.info(f" 地址 {addr}: {dp_info['name']} ({dp_info['type']}, {dp_info['length']}字节, "
    f"精度: {'原始值' if dp_info.get('scale') is None else dp_info['scale']})")

    # 主循环
    while True:
    time.sleep(10)
    # 可以在这里添加定期状态报告

    except KeyboardInterrupt:
    dtu_server.stop()
    logging.info("服务器已停止")

    if __name__ == "__main__":
    main()

    8. Web界面模板 (templates/index.html)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    <!DOCTYPE html>
    <html lang="zh-CN">
    <head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>DTU服务器监控</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
    </head>
    <body>
    <div class="container mt-4">
    <h1 class="mb-4">DTU服务器监控面板</h1>

    <div class="row">
    <div class="col-md-3">
    <div class="card text-white bg-primary mb-3">
    <div class="card-header">在线设备</div>
    <div class="card-body">
    <h5 class="card-title" id="online-devices">0</h5>
    </div>
    </div>
    </div>
    <div class="col-md-3">
    <div class="card text-white bg-success mb-3">
    <div class="card-header">今日数据点</div>
    <div class="card-body">
    <h5 class="card-title" id="today-data">0</h5>
    </div>
    </div>
    </div>
    <div class="col-md-3">
    <div class="card text-white bg-info mb-3">
    <div class="card-header">今日心跳</div>
    <div class="card-body">
    <h5 class="card-title" id="today-heartbeats">0</h5>
    </div>
    </div>
    </div>
    <div class="col-md-3">
    <div class="card text-white bg-warning mb-3">
    <div class="card-header">服务器状态</div>
    <div class="card-body">
    <h5 class="card-title">运行中</h5>
    </div>
    </div>
    </div>
    </div>

    <div class="row mt-4">
    <div class="col-md-6">
    <div class="card">
    <div class="card-header">最近连接</div>
    <div class="card-body">
    <table class="table table-striped">
    <thead>
    <tr>
    <th>设备ID</th>
    <th>IP地址</th>
    <th>最后在线</th>
    </tr>
    </thead>
    <tbody id="recent-devices">
    <!-- 动态填充 -->
    </tbody>
    </table>
    </div>
    </div>
    </div>
    <div class="col-md-6">
    <div class="card">
    <div class="card-header">系统状态</div>
    <div class="card-body">
    <div class="list-group">
    <a href="/devices" class="list-group-item list-group-item-action">
    <div class="d-flex w-100 justify-content-between">
    <h5 class="mb-1">在线设备</h5>
    <span class="badge bg-primary rounded-pill" id="online-count">0</span>
    </div>
    <p class="mb-1">查看所有在线设备</p>
    </a>
    <a href="/data" class="list-group-item list-group-item-action">
    <div class="d-flex w-100 justify-content-between">
    <h5 class="mb-1">数据浏览</h5>
    <span class="badge bg-success rounded-pill" id="data-count">0</span>
    </div>
    <p class="mb-1">查看设备数据点</p>
    </a>
    <a href="/connections" class="list-group-item list-group-item-action">
    <div class="d-flex w-100 justify-content-between">
    <h5 class="mb-1">连接历史</h5>
    <span class="badge bg-info rounded-pill" id="connections-count">0</span>
    </div>
    <p class="mb-1">查看设备连接历史</p>
    </a>
    </div>
    </div>
    </div>
    </div>
    </div>
    </div>

    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
    <script>
    // 使用AJAX获取数据并更新页面
    function updateDashboard() {
    fetch('/api/devices')
    .then(response => response.json())
    .then(data => {
    document.getElementById('online-devices').textContent = data.length;
    document.getElementById('online-count').textContent = data.length;

    // 更新最近设备
    const tbody = document.getElementById('recent-devices');
    tbody.innerHTML = '';

    data.slice(0, 5).forEach(device => {
    const row = document.createElement('tr');
    row.innerHTML = `
    <td><a href="/device/${device.device_id}">${device.device_id}</a></td>
    <td>${device.ip_address}:${device.port}</td>
    <td>${new Date(device.last_seen).toLocaleString()}</td>
    `;
    tbody.appendChild(row);
    });
    });

    // 其他数据更新...
    }

    // 每10秒更新一次
    setInterval(updateDashboard, 10000);
    updateDashboard(); // 初始加载
    </script>
    </body>
    </html>

    9. 其他模板文件

    由于篇幅限制,其他模板文件(devices.html, data.html, connections.html, device_detail.html)的实现与index.html类似,主要展示从数据库获取的不同类型数据。

    系统功能总结

    1. MySQL数据存储

    • 设备信息:存储设备注册ID、IP地址、端口、连接状态等
    • 心跳记录:记录每次心跳的时间戳
    • 数据点:存储解析后的数据点值及原始值
    • 原始数据包:存储接收到的各种类型数据包
    • 连接历史:记录设备连接/断开事件

    2. 配置文件管理

    • 使用YAML格式配置文件
    • 集中管理服务器、数据库、缓存、查询任务和数据点配置
    • 支持动态加载配置

    3. Web界面

    • 在指定端口提供Web服务
    • 功能包括:
      • 设备状态监控
      • 数据点浏览
      • 原始数据包查看
      • 连接历史查询
      • 系统状态仪表盘
    • 提供RESTful API接口

    4. 模块化设计

    • 主程序:系统入口和协调
    • DTU服务器:处理设备连接和通信
    • 数据库处理:MySQL操作封装
    • 缓存管理:数据缓存和批量写入
    • 配置加载:统一配置管理
    • Web界面:提供用户交互界面

    5. 缓存机制

    • 使用内存缓存接收的数据
    • 支持配置缓存大小和刷新间隔
    • 定时批量写入数据库,减少I/O压力
    • 在服务器停止时自动刷新所有缓存数据

    部署说明

    1. 安装依赖
    1
    pip install PyYAML mysql-connector-python flask
    1. 创建数据库
    1
    2
    3
    4
    CREATE DATABASE dtu_data;
    CREATE USER 'abc'@'%' IDENTIFIED BY 'abc@123';
    GRANT ALL PRIVILEGES ON dtu_data.* TO 'abc'@'%';
    FLUSH PRIVILEGES;
    1. 配置修改
      编辑config.yml文件,设置MySQL连接参数和其他配置

    2. 启动服务器

    1
    python main.py
    1. 访问Web界面
      打开浏览器访问 http://服务器IP:8080

    扩展建议

    1. 安全增强

      • 添加HTTPS支持
      • 实现用户认证系统
      • 添加API密钥验证
    2. 性能优化

      • 使用异步I/O(如asyncio)
      • 添加连接池管理
      • 实现数据压缩传输
    3. 功能扩展

      • 添加设备管理功能
      • 实现数据导出(CSV/Excel)
      • 添加报警和通知系统
      • 集成数据可视化(图表展示)
    4. 高可用性

      • 实现主备服务器切换
      • 添加数据库复制支持
      • 实现数据持久化存储

    这个完整的DTU服务器解决方案满足了您的所有要求,包括配置文件管理、MySQL存储、Web界面和缓存机制。系统设计模块化,易于维护和扩展,适合工业物联网应用场景。

    打赏
    打赏提示信息
    分享
    分享提示信息