异步编程示例
Fleet 语言提供了现代化的异步编程支持,让你能够编写高效的并发程序。
📋 目录
🚀 基础异步概念
异步函数定义
// 异步函数使用 async 关键字
async fn fetch_data(url: str) -> Result {
// 模拟网络请求
await sleep_async(1000); // 异步等待1秒
if url.starts_with("https://") {
return Result::Ok("Data from " + url);
} else {
return Result::Err("Invalid URL");
}
}
async fn main() {
println("Starting async operation...");
match await fetch_data("https://api.example.com") {
Result::Ok(data) => println("Received: " + data),
Result::Err(error) => println("Error: " + error),
}
println("Async operation completed!");
}
Future 类型
// Future 表示一个可能尚未完成的异步操作
fn create_future() -> Future[int] {
return async {
await sleep_async(500);
return 42;
};
}
async fn main() {
let future = create_future();
let result = await future;
println("Future result: " + result); // 42
}
⚡ 异步函数
基本异步操作
async fn download_file(url: str, filename: str) -> Result<(), str> {
println("Starting download from " + url);
// 模拟下载过程
let content = await http_get(url)?;
// 异步写入文件
await write_file_async(filename, content)?;
println("Download completed: " + filename);
return Result::Ok(());
}
async fn process_data(data: str) -> str {
println("Processing data...");
// 模拟CPU密集型操作
await yield_now(); // 让出控制权
let processed = data.to_uppercase();
println("Data processing completed");
return processed;
}
async fn main() {
// 顺序执行异步操作
match await download_file("https://example.com/data.txt", "local_data.txt") {
Result::Ok(_) => {
let content = await read_file_async("local_data.txt").unwrap();
let processed = await process_data(content);
println("Final result: " + processed);
},
Result::Err(error) => println("Download failed: " + error),
}
}
异步生成器
async fn number_generator(start: int, end: int) -> AsyncIterator[int] {
var current = start;
while current <= end {
await sleep_async(100); // 模拟延迟
yield current;
current = current + 1;
}
}
async fn main() {
let generator = number_generator(1, 5);
await for num in generator {
println("Generated: " + num);
}
}
🔄 并发执行
并行执行多个任务
async fn task1() -> str {
await sleep_async(1000);
return "Task 1 completed";
}
async fn task2() -> str {
await sleep_async(1500);
return "Task 2 completed";
}
async fn task3() -> str {
await sleep_async(800);
return "Task 3 completed";
}
async fn main() {
println("Starting concurrent tasks...");
let start_time = now();
// 并发执行多个任务
let (result1, result2, result3) = await join!(task1(), task2(), task3());
let elapsed = now() - start_time;
println(result1);
println(result2);
println(result3);
println("All tasks completed in " + elapsed + "ms");
// 总时间约为1500ms(最长任务的时间),而不是3300ms
}
使用 spawn 创建并发任务
async fn background_task(id: int) -> int {
println("Background task " + id + " started");
await sleep_async(random_int(500, 2000));
println("Background task " + id + " completed");
return id * 2;
}
async fn main() {
// 启动多个后台任务
let task1 = spawn(background_task(1));
let task2 = spawn(background_task(2));
let task3 = spawn(background_task(3));
println("All tasks spawned, doing other work...");
// 做其他工作
await sleep_async(1000);
println("Other work completed");
// 等待所有任务完成
let results = await join!(task1, task2, task3);
println("Task results: " + results);
}
任务取消
async fn long_running_task() -> Result {
loop i in 0..100 {
// 检查是否被取消
if is_cancelled() {
return Result::Err("Task was cancelled");
}
await sleep_async(100);
println("Progress: " + i + "%");
}
return Result::Ok("Task completed successfully");
}
async fn main() {
let task = spawn(long_running_task());
// 等待3秒后取消任务
await sleep_async(3000);
task.cancel();
match await task {
Result::Ok(result) => println("Task result: " + result),
Result::Err(error) => println("Task error: " + error),
}
}
💾 异步 I/O
文件操作
async fn copy_file_async(src: str, dst: str) -> Result<(), str> {
println("Starting file copy: " + src + " -> " + dst);
// 异步读取源文件
let content = await read_file_async(src)?;
// 异步写入目标文件
await write_file_async(dst, content)?;
println("File copy completed");
return Result::Ok(());
}
async fn process_multiple_files(files: [str>) -> Result<(), str> {
// 并发处理多个文件
let tasks = map(files, |file| {
return spawn(async {
let content = await read_file_async(file)?;
let processed = content.to_uppercase();
let output_file = file + ".processed";
await write_file_async(output_file, processed)?;
return Result::Ok(output_file);
});
});
// 等待所有任务完成
let results = await join_all(tasks);
loop result in results {
match result {
Result::Ok(filename) => println("Processed: " + filename),
Result::Err(error) => println("Error: " + error),
}
}
return Result::Ok(());
}
async fn main() {
let files = ["file1.txt", "file2.txt", "file3.txt">;
match await process_multiple_files(files) {
Result::Ok(_) => println("All files processed"),
Result::Err(error) => println("Processing failed: " + error),
}
}
网络操作
async fn http_client_example() -> Result<(), str> {
// 创建HTTP客户端
let client = HttpClient::new();
// 并发发送多个请求
let urls = [
"https://api.github.com/users/octocat",
"https://jsonplaceholder.typicode.com/posts/1",
"https://httpbin.org/uuid",
>;
let requests = map(urls, |url| {
return spawn(async {
let response = await client.get(url)?;
let body = await response.text()?;
return Result::Ok((url, body));
});
});
let results = await join_all(requests);
loop result in results {
match result {
Result::Ok((url, body)) => {
println("Response from " + url + ":");
println(body.substring(0, min(100, body.len())) + "...");
println("---");
},
Result::Err(error) => println("Request failed: " + error),
}
}
return Result::Ok(());
}
async fn websocket_example() -> Result<(), str> {
// 连接WebSocket
let ws = await WebSocket::connect("wss://echo.websocket.org")?;
// 启动消息接收任务
let receive_task = spawn(async {
loop {
match await ws.receive() {
Result::Ok(message) => println("Received: " + message),
Result::Err(error) => {
println("WebSocket error: " + error);
break;
},
}
}
});
// 发送一些消息
await ws.send("Hello, WebSocket!")?;
await sleep_async(1000);
await ws.send("This is Fleet language")?;
await sleep_async(1000);
await ws.send("Goodbye!")?;
// 等待一段时间后关闭连接
await sleep_async(2000);
await ws.close()?;
// 等待接收任务完成
await receive_task;
return Result::Ok(());
}
❌ 错误处理
异步错误传播
async fn fetch_user_data(user_id: int) -> Result[UserData, ApiError> {
// 获取用户基本信息
let user = await api_call("/users/" + user_id)?;
// 获取用户详细信息
let profile = await api_call("/users/" + user_id + "/profile")?;
// 获取用户设置
let settings = await api_call("/users/" + user_id + "/settings")?;
return Result::Ok(UserData {
user: user,
profile: profile,
settings: settings,
});
}
async fn handle_user_request(user_id: int) -> Result[str, str> {
match await fetch_user_data(user_id) {
Result::Ok(data) => {
let response = format_user_response(data);
return Result::Ok(response);
},
Result::Err(ApiError::NotFound) => {
return Result::Err("User not found");
},
Result::Err(ApiError::Unauthorized) => {
return Result::Err("Access denied");
},
Result::Err(ApiError::NetworkError(msg)) => {
return Result::Err("Network error: " + msg);
},
}
}
超时处理
async fn with_timeout(
future: Future[T>,
timeout_ms: int
) -> Result {
let timeout_future = async {
await sleep_async(timeout_ms);
return Result::Err("Operation timed out");
};
let main_future = async {
let result = await future;
return Result::Ok(result);
};
// 竞争执行,返回第一个完成的结果
return await race!(main_future, timeout_future);
}
async fn slow_operation() -> str {
await sleep_async(5000); // 5秒操作
return "Operation completed";
}
async fn main() {
match await with_timeout(slow_operation(), 3000) {
Result::Ok(result) => println("Success: " + result),
Result::Err(error) => println("Error: " + error), // "Operation timed out"
}
}
重试机制
async fn retry_async(
operation: fn() -> Future[Result>,
max_attempts: int,
delay_ms: int
) -> Result {
var attempts = 0;
loop {
attempts = attempts + 1;
match await operation() {
Result::Ok(value) => return Result::Ok(value),
Result::Err(error) => {
if attempts >= max_attempts {
return Result::Err(error);
}
println("Attempt " + attempts + " failed, retrying in " + delay_ms + "ms...");
await sleep_async(delay_ms);
},
}
}
}
async fn unreliable_api_call() -> Result[str, str> {
// 模拟不稳定的API调用
if random() < 0.7 {
return Result::Err("Network error");
} else {
return Result::Ok("API response data");
}
}
async fn main() {
match await retry_async(unreliable_api_call, 3, 1000) {
Result::Ok(data) => println("Got data: " + data),
Result::Err(error) => println("Failed after retries: " + error),
}
}
🏗️ 实际应用
异步Web服务器
struct HttpRequest {
method: str,
path: str,
headers: map,
body: str,
}
struct HttpResponse {
status: int,
headers: map,
body: str,
}
async fn handle_request(request: HttpRequest) -> HttpResponse {
match request.path {
"/api/users" => {
if request.method == "GET" {
let users = await fetch_users_from_db();
return HttpResponse {
status: 200,
headers: map{"Content-Type": "application/json"},
body: serialize_json(users),
};
} else {
return HttpResponse {
status: 405,
headers: map{},
body: "Method not allowed",
};
}
},
"/api/health" => {
return HttpResponse {
status: 200,
headers: map{"Content-Type": "text/plain"},
body: "OK",
};
},
_ => {
return HttpResponse {
status: 404,
headers: map{},
body: "Not found",
};
},
}
}
async fn start_server(port: int) -> Result[(), str> {
println("Starting server on port " + port);
let server = await TcpListener::bind("127.0.0.1:" + port)?;
loop {
let (stream, addr) = await server.accept()?;
println("New connection from " + addr);
// 为每个连接启动一个异步任务
spawn(async {
match await handle_connection(stream) {
Result::Ok(_) => println("Connection handled successfully"),
Result::Err(error) => println("Connection error: " + error),
}
});
}
}
async fn main() {
match await start_server(8080) {
Result::Ok(_) => println("Server stopped"),
Result::Err(error) => println("Server error: " + error),
}
}
异步数据处理管道
struct DataItem {
id: int,
content: str,
timestamp: int,
}
async fn data_source() -> AsyncIterator[DataItem> {
var id = 1;
loop {
await sleep_async(100); // 模拟数据到达间隔
yield DataItem {
id: id,
content: "Data item " + id,
timestamp: now(),
};
id = id + 1;
if id > 100 {
break;
}
}
}
async fn process_item(item: DataItem) -> DataItem {
// 模拟处理时间
await sleep_async(random_int(50, 200));
return DataItem {
id: item.id,
content: item.content.to_uppercase(),
timestamp: item.timestamp,
};
}
async fn save_item(item: DataItem) -> Result<(), str> {
// 模拟保存到数据库
await sleep_async(50);
println("Saved item " + item.id + ": " + item.content);
return Result::Ok(());
}
async fn data_pipeline() -> Result[(), str> {
let source = data_source();
let mut buffer = [>;
let batch_size = 5;
await for item in source {
buffer.push(item);
if buffer.len() >= batch_size {
// 批量并发处理
let processed_tasks = map(buffer, |item| {
return spawn(process_item(item));
});
let processed_items = await join_all(processed_tasks);
// 批量保存
let save_tasks = map(processed_items, |item| {
return spawn(save_item(item));
});
let save_results = await join_all(save_tasks);
// 检查保存结果
loop result in save_results {
match result {
Result::Ok(_) => {}, // 成功
Result::Err(error) => println("Save error: " + error),
}
}
buffer.clear();
}
}
// 处理剩余的数据
if !buffer.is_empty() {
let processed_tasks = map(buffer, |item| spawn(process_item(item)));
let processed_items = await join_all(processed_tasks);
let save_tasks = map(processed_items, |item| spawn(save_item(item)));
await join_all(save_tasks);
}
return Result::Ok(());
}
async fn main() {
println("Starting data pipeline...");
match await data_pipeline() {
Result::Ok(_) => println("Data pipeline completed successfully"),
Result::Err(error) => println("Pipeline error: " + error),
}
}
🎯 最佳实践
异步编程指南
- 避免阻塞操作 - 在异步函数中使用异步版本的I/O操作
- 合理使用并发 - 不要过度并发,考虑系统资源限制
- 错误处理 - 异步操作的错误处理同样重要
- 取消支持 - 长时间运行的任务应支持取消
- 背压处理 - 在数据流处理中考虑背压问题
性能优化
// ✅ 好的做法 - 批量处理
async fn process_items_batch(items: [Item]) -> [Result] {
let tasks = map(items, |item| spawn(process_item(item)));
return await join_all(tasks);
}
// ❌ 避免的做法 - 逐个处理
async fn process_items_sequential(items: [Item]) -> [Result] {
var results = [];
loop item in items {
let result = await process_item(item);
results.push(result);
}
return results;
}
🔗 相关主题
- 错误处理 - 异步错误处理模式
- 并发编程 - 线程和并发原语
- 网络编程 - 网络相关的异步操作
- 性能优化 - 异步程序的性能优化