有了 InfluxDB 环境,接下来就是往里面写数据了。InfluxDB 使用一种叫 Line Protocol 的格式来接收数据,这个协议设计得很巧妙,既简洁又功能强大。
掌握 Line Protocol 是使用 InfluxDB 的基础技能,就像学 SQL 一样重要。
Line Protocol 基础语法结构图,展示了measurement、tag set、field set和 timestamp 四个核心组成部分及其语法规则

Line Protocol的格式看起来是这样的:
text measurement,tag_key1=tag_value1,tag_key2=tag_value2 field_key1=field_value1,field_key2=field_value2 timestamp

与 CSV 相似,在 InfluxDB 行协议中,一条数据和另一条数据之间使用换行符分隔, 所以一行就是一条数据。另外,在时序数据库领域,一行数据一行数据由下面 4 种元素构成。这一行包含了四个部分:
来看几个真实的例子:
text # 温度传感器数据 temperature,room=living_room,sensor=DHT22 value=23.5,humidity=65.2 1640995200000000000 # 服务器监控数据 cpu_usage,host=server01,region=us-west cpu_percent=85.2,memory_percent=72.1 1640995260000000000 # 网站访问统计 page_views,page=/home,user_agent=chrome count=1,response_time=120 1640995320000000000
看起来很直观对吧?每一行就是一条数据记录。
Line Protocol 数据类型和格式规则图,详细展示了 Tag 和 Field 的数据类型、格式要求以及时间戳的不同精度格式

测量名就是你数据的分类,类似于数据库表名:
text # 好的测量名 temperature cpu_usage network_traffic user_login # 避免的测量名 temperature data # 不要有空格 cpu-usage% # 避免特殊字符
如果测量名包含空格或特殊字符,需要用反斜杠转义:
text my\ measurement,tag1=value1 field1=100
标签用来给数据分类,方便后续查询。记住几个要点:
标签值只能是字符串
text # 正确 location=room1,sensor_type=temperature # 错误 - 标签值不能是数字 room_number=1 # 应该写成 room_number="1"
标签用于索引和筛选
text # 这些适合做标签 host=server01 # 服务器名 region=us-west # 地区 environment=production # 环境 device_type=sensor # 设备类型
标签顺序会影响性能
text # 推荐:把基数小的标签放前面 environment=prod,region=us-west,host=server01 # 不推荐:把基数大的标签放前面 host=server01,region=us-west,environment=prod
字段存储实际的数值数据:
支持多种数据类型
text # 整数 count=100i # 浮点数 temperature=23.5 cpu_percent=85.2 # 字符串(需要双引号) status="online" message="system started" # 布尔值 is_active=true is_error=false
字段可以进行数学运算
text # 多个字段 cpu_usage,host=server01 user=45.2,system=12.8,idle=42.0
时间戳是可选的,支持多种格式:
text # 纳秒时间戳(默认) temperature,room=living_room value=23.5 1640995200000000000 # 不指定时间戳,使用当前时间 temperature,room=living_room value=23.5 # RFC3339格式 temperature,room=living_room value=23.5 2024-01-01T12:00:00Z
InfluxDB 数据写入方法对比图,展示了CLI、文件批量导入和HTTP API三种写入方式的特点、使用场景和性能对比

最简单的方式是用 influx CLI:
sh # 单条数据写入 influx write \ --bucket mybucket \ --org myorg \ --token $INFLUX_TOKEN \ 'temperature,location=room1 value=23.5' # 多条数据写入 influx write \ --bucket mybucket \ --org myorg \ --token $INFLUX_TOKEN \ 'temperature,location=room1 value=23.5 temperature,location=room2 value=25.1 humidity,location=room1 value=65.2'
如果有大量数据,可以先写到文件里:
sh # 创建数据文件 data.txt cat > data.txt << EOF temperature,location=room1,sensor=DHT22 value=23.5,humidity=65.2 temperature,location=room2,sensor=DHT22 value=25.1,humidity=62.8 cpu_usage,host=server01,region=us-west cpu_percent=85.2,memory_percent=72.1 cpu_usage,host=server02,region=us-west cpu_percent=78.9,memory_percent=68.5 EOF # 批量导入 influx write \ --bucket mybucket \ --org myorg \ --token $INFLUX_TOKEN \ --file data.txt
也可以直接用HTTP API:
sh curl -XPOST "http://localhost:8086/api/v2/write?org=myorg&bucket=mybucket" \ -H "Authorization: Token $INFLUX_TOKEN" \ -H "Content-Type: text/plain; charset=utf-8" \ --data-binary 'temperature,location=room1 value=23.5'
InfluxDB 编程语言客户端对比图,展示了Python、Java、Go、Node.js和C#客户端的特点、性能评级和适用场景

此处我们来看下 Java 客户端的实现:
java import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import java.time.Instant; import java.util.ArrayList; import java.util.List; public class InfluxDBExample { public static void main(String[] args) { // 连接配置 String url = "http://localhost:8086"; String token = "your-token"; String org = "myorg"; String bucket = "mybucket"; InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray()); WriteApiBlocking writeApi = client.getWriteApiBlocking(); // 方式1:使用Point对象 Point point = Point.measurement("temperature") .addTag("location", "room1") .addTag("sensor", "DHT22") .addField("value", 23.5) .addField("humidity", 65.2) .time(Instant.now(), WritePrecision.NS); writeApi.writePoint(bucket, org, point); // 方式2:使用Line Protocol字符串 String lineProtocol = "temperature,location=room1,sensor=DHT22 value=23.5,humidity=65.2"; writeApi.writeRecord(bucket, org, WritePrecision.NS, lineProtocol); // 方式3:批量写入 List<Point> points = new ArrayList<>(); for (int i = 0; i < 100; i++) { Point batchPoint = Point.measurement("temperature") .addTag("location", "room" + i) .addField("value", 20.0 + i * 0.1) .time(Instant.now(), WritePrecision.NS); points.add(batchPoint); } writeApi.writePoints(bucket, org, points); // 方式4:使用POJO对象 TemperatureData data = new TemperatureData(); data.location = "room1"; data.sensor = "DHT22"; data.value = 23.5; data.humidity = 65.2; data.time = Instant.now(); writeApi.writeMeasurement(bucket, org, WritePrecision.NS, data); client.close(); } } // POJO类定义 import com.influxdb.annotations.Column; import com.influxdb.annotations.Measurement; import java.time.Instant; @Measurement(name = "temperature") public class TemperatureData { @Column(tag = true) public String location; @Column(tag = true) public String sensor; @Column public Double value; @Column public Double humidity; @Column(timestamp = true) public Instant time; }
Maven 依赖配置:
xml <dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>6.10.0</version> </dependency>
InfluxDB 批量写入优化流程图,展示了从数据准备到性能监控的完整优化策略和最佳实践

不要一条一条地写入数据,批量写入效率更高:
sh # 不推荐:逐条写入 for data in sensor_data: write_api.write(bucket="mybucket", record=data) # 推荐:批量写入 batch_size = 1000 points = [] for data in sensor_data: points.append(create_point(data)) if len(points) >= batch_size: write_api.write(bucket="mybucket", record=points) points = [] # 写入剩余数据 if points: write_api.write(bucket="mybucket", record=points)
对于高频写入场景,使用异步写入:
sh from influxdb_client.client.write_api import WriteOptions # 配置异步写入 write_options = WriteOptions( batch_size=1000, flush_interval=10_000, # 10秒 jitter_interval=2_000, # 2秒抖动 retry_interval=5_000, # 重试间隔 max_retries=3 ) write_api = client.write_api(write_options=write_options)
对于大量数据,启用压缩能节省带宽:
sh client = InfluxDBClient( url="http://localhost:8086", token="your-token", org="myorg", enable_gzip=True # 启用压缩 )
InfluxDB Line Protocol 常见错误诊断与解决方案图,提供了完整的错误分类、诊断流程和解决方法

text # 错误:标签和字段之间缺少空格 temperature,room=living_roomvalue=23.5 # 正确:标签和字段之间要有空格 temperature,room=living_room value=23.5 # 错误:字段值包含空格但没有引号 status,host=server01 message=system started # 正确:字符串字段值要用双引号 status,host=server01 message="system started"
text # 错误:标签值不能是数字 temperature,room_number=1 value=23.5 # 正确:标签值必须是字符串 temperature,room_number="1" value=23.5 # 错误:整数字段没有i后缀 count,type=request value=100 # 正确:整数字段要加i后缀 count,type=request value=100i
text # 错误:时间戳精度不对 temperature,room=living_room value=23.5 1640995200 # 正确:使用纳秒时间戳 temperature,room=living_room value=23.5 1640995200000000000
sh # 控制写入频率,避免过于频繁 import time last_write_time = 0 min_interval = 1 # 最小间隔1秒 def write_data(data): global last_write_time current_time = time.time() if current_time - last_write_time >= min_interval: write_api.write(bucket="mybucket", record=data) last_write_time = current_time
掌握了 Line Protocol,你就能灵活地向 InfluxDB 写入各种时间序列数据了。下一篇我们会学习如何查询这些数据,让数据真正发挥价值。
本文作者:张豪
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!