异步编程示例

Fleet 语言提供了现代化的异步编程支持,让你能够编写高效的并发程序。

📋 目录

🚀 基础异步概念

异步函数定义

// 异步函数使用 async 关键字
async fn fetch_data(url: str) -> Result {
    // 模拟网络请求
    await sleep_async(1000);  // 异步等待1秒

    if url.starts_with("https://") {
        return Result::Ok("Data from " + url);
    } else {
        return Result::Err("Invalid URL");
    }
}

async fn main() {
    println("Starting async operation...");

    match await fetch_data("https://api.example.com") {
        Result::Ok(data) => println("Received: " + data),
        Result::Err(error) => println("Error: " + error),
    }

    println("Async operation completed!");
}

Future 类型

// Future 表示一个可能尚未完成的异步操作
fn create_future() -> Future[int] {
    return async {
        await sleep_async(500);
        return 42;
    };
}

async fn main() {
    let future = create_future();
    let result = await future;
    println("Future result: " + result);  // 42
}

⚡ 异步函数

基本异步操作

async fn download_file(url: str, filename: str) -> Result<(), str> {
    println("Starting download from " + url);

    // 模拟下载过程
    let content = await http_get(url)?;

    // 异步写入文件
    await write_file_async(filename, content)?;

    println("Download completed: " + filename);
    return Result::Ok(());
}

async fn process_data(data: str) -> str {
    println("Processing data...");

    // 模拟CPU密集型操作
    await yield_now();  // 让出控制权

    let processed = data.to_uppercase();

    println("Data processing completed");
    return processed;
}

async fn main() {
    // 顺序执行异步操作
    match await download_file("https://example.com/data.txt", "local_data.txt") {
        Result::Ok(_) => {
            let content = await read_file_async("local_data.txt").unwrap();
            let processed = await process_data(content);
            println("Final result: " + processed);
        },
        Result::Err(error) => println("Download failed: " + error),
    }
}

异步生成器

async fn number_generator(start: int, end: int) -> AsyncIterator[int] {
    var current = start;
    while current <= end {
        await sleep_async(100);  // 模拟延迟
        yield current;
        current = current + 1;
    }
}

async fn main() {
    let generator = number_generator(1, 5);

    await for num in generator {
        println("Generated: " + num);
    }
}

🔄 并发执行

并行执行多个任务

async fn task1() -> str {
    await sleep_async(1000);
    return "Task 1 completed";
}

async fn task2() -> str {
    await sleep_async(1500);
    return "Task 2 completed";
}

async fn task3() -> str {
    await sleep_async(800);
    return "Task 3 completed";
}

async fn main() {
    println("Starting concurrent tasks...");
    let start_time = now();

    // 并发执行多个任务
    let (result1, result2, result3) = await join!(task1(), task2(), task3());

    let elapsed = now() - start_time;

    println(result1);
    println(result2);
    println(result3);
    println("All tasks completed in " + elapsed + "ms");
    // 总时间约为1500ms(最长任务的时间),而不是3300ms
}

使用 spawn 创建并发任务

async fn background_task(id: int) -> int {
    println("Background task " + id + " started");
    await sleep_async(random_int(500, 2000));
    println("Background task " + id + " completed");
    return id * 2;
}

async fn main() {
    // 启动多个后台任务
    let task1 = spawn(background_task(1));
    let task2 = spawn(background_task(2));
    let task3 = spawn(background_task(3));

    println("All tasks spawned, doing other work...");

    // 做其他工作
    await sleep_async(1000);
    println("Other work completed");

    // 等待所有任务完成
    let results = await join!(task1, task2, task3);
    println("Task results: " + results);
}

任务取消

async fn long_running_task() -> Result {
    loop i in 0..100 {
        // 检查是否被取消
        if is_cancelled() {
            return Result::Err("Task was cancelled");
        }

        await sleep_async(100);
        println("Progress: " + i + "%");
    }

    return Result::Ok("Task completed successfully");
}

async fn main() {
    let task = spawn(long_running_task());

    // 等待3秒后取消任务
    await sleep_async(3000);
    task.cancel();

    match await task {
        Result::Ok(result) => println("Task result: " + result),
        Result::Err(error) => println("Task error: " + error),
    }
}

💾 异步 I/O

文件操作

async fn copy_file_async(src: str, dst: str) -> Result<(), str> {
    println("Starting file copy: " + src + " -> " + dst);

    // 异步读取源文件
    let content = await read_file_async(src)?;

    // 异步写入目标文件
    await write_file_async(dst, content)?;

    println("File copy completed");
    return Result::Ok(());
}

async fn process_multiple_files(files: [str>) -> Result<(), str> {
    // 并发处理多个文件
    let tasks = map(files, |file| {
        return spawn(async {
            let content = await read_file_async(file)?;
            let processed = content.to_uppercase();
            let output_file = file + ".processed";
            await write_file_async(output_file, processed)?;
            return Result::Ok(output_file);
        });
    });

    // 等待所有任务完成
    let results = await join_all(tasks);

    loop result in results {
        match result {
            Result::Ok(filename) => println("Processed: " + filename),
            Result::Err(error) => println("Error: " + error),
        }
    }

    return Result::Ok(());
}

async fn main() {
    let files = ["file1.txt", "file2.txt", "file3.txt">;

    match await process_multiple_files(files) {
        Result::Ok(_) => println("All files processed"),
        Result::Err(error) => println("Processing failed: " + error),
    }
}

网络操作

async fn http_client_example() -> Result<(), str> {
    // 创建HTTP客户端
    let client = HttpClient::new();

    // 并发发送多个请求
    let urls = [
        "https://api.github.com/users/octocat",
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://httpbin.org/uuid",
    >;

    let requests = map(urls, |url| {
        return spawn(async {
            let response = await client.get(url)?;
            let body = await response.text()?;
            return Result::Ok((url, body));
        });
    });

    let results = await join_all(requests);

    loop result in results {
        match result {
            Result::Ok((url, body)) => {
                println("Response from " + url + ":");
                println(body.substring(0, min(100, body.len())) + "...");
                println("---");
            },
            Result::Err(error) => println("Request failed: " + error),
        }
    }

    return Result::Ok(());
}

async fn websocket_example() -> Result<(), str> {
    // 连接WebSocket
    let ws = await WebSocket::connect("wss://echo.websocket.org")?;

    // 启动消息接收任务
    let receive_task = spawn(async {
        loop {
            match await ws.receive() {
                Result::Ok(message) => println("Received: " + message),
                Result::Err(error) => {
                    println("WebSocket error: " + error);
                    break;
                },
            }
        }
    });

    // 发送一些消息
    await ws.send("Hello, WebSocket!")?;
    await sleep_async(1000);
    await ws.send("This is Fleet language")?;
    await sleep_async(1000);
    await ws.send("Goodbye!")?;

    // 等待一段时间后关闭连接
    await sleep_async(2000);
    await ws.close()?;

    // 等待接收任务完成
    await receive_task;

    return Result::Ok(());
}

❌ 错误处理

异步错误传播

async fn fetch_user_data(user_id: int) -> Result[UserData, ApiError> {
    // 获取用户基本信息
    let user = await api_call("/users/" + user_id)?;

    // 获取用户详细信息
    let profile = await api_call("/users/" + user_id + "/profile")?;

    // 获取用户设置
    let settings = await api_call("/users/" + user_id + "/settings")?;

    return Result::Ok(UserData {
        user: user,
        profile: profile,
        settings: settings,
    });
}

async fn handle_user_request(user_id: int) -> Result[str, str> {
    match await fetch_user_data(user_id) {
        Result::Ok(data) => {
            let response = format_user_response(data);
            return Result::Ok(response);
        },
        Result::Err(ApiError::NotFound) => {
            return Result::Err("User not found");
        },
        Result::Err(ApiError::Unauthorized) => {
            return Result::Err("Access denied");
        },
        Result::Err(ApiError::NetworkError(msg)) => {
            return Result::Err("Network error: " + msg);
        },
    }
}

超时处理

async fn with_timeout(
    future: Future[T>, 
    timeout_ms: int
) -> Result {
    let timeout_future = async {
        await sleep_async(timeout_ms);
        return Result::Err("Operation timed out");
    };

    let main_future = async {
        let result = await future;
        return Result::Ok(result);
    };

    // 竞争执行,返回第一个完成的结果
    return await race!(main_future, timeout_future);
}

async fn slow_operation() -> str {
    await sleep_async(5000);  // 5秒操作
    return "Operation completed";
}

async fn main() {
    match await with_timeout(slow_operation(), 3000) {
        Result::Ok(result) => println("Success: " + result),
        Result::Err(error) => println("Error: " + error),  // "Operation timed out"
    }
}

重试机制

async fn retry_async(
    operation: fn() -> Future[Result>,
    max_attempts: int,
    delay_ms: int
) -> Result {
    var attempts = 0;

    loop {
        attempts = attempts + 1;

        match await operation() {
            Result::Ok(value) => return Result::Ok(value),
            Result::Err(error) => {
                if attempts >= max_attempts {
                    return Result::Err(error);
                }

                println("Attempt " + attempts + " failed, retrying in " + delay_ms + "ms...");
                await sleep_async(delay_ms);
            },
        }
    }
}

async fn unreliable_api_call() -> Result[str, str> {
    // 模拟不稳定的API调用
    if random() < 0.7 {
        return Result::Err("Network error");
    } else {
        return Result::Ok("API response data");
    }
}

async fn main() {
    match await retry_async(unreliable_api_call, 3, 1000) {
        Result::Ok(data) => println("Got data: " + data),
        Result::Err(error) => println("Failed after retries: " + error),
    }
}

🏗️ 实际应用

异步Web服务器

struct HttpRequest {
    method: str,
    path: str,
    headers: map,
    body: str,
}

struct HttpResponse {
    status: int,
    headers: map,
    body: str,
}

async fn handle_request(request: HttpRequest) -> HttpResponse {
    match request.path {
        "/api/users" => {
            if request.method == "GET" {
                let users = await fetch_users_from_db();
                return HttpResponse {
                    status: 200,
                    headers: map{"Content-Type": "application/json"},
                    body: serialize_json(users),
                };
            } else {
                return HttpResponse {
                    status: 405,
                    headers: map{},
                    body: "Method not allowed",
                };
            }
        },
        "/api/health" => {
            return HttpResponse {
                status: 200,
                headers: map{"Content-Type": "text/plain"},
                body: "OK",
            };
        },
        _ => {
            return HttpResponse {
                status: 404,
                headers: map{},
                body: "Not found",
            };
        },
    }
}

async fn start_server(port: int) -> Result[(), str> {
    println("Starting server on port " + port);

    let server = await TcpListener::bind("127.0.0.1:" + port)?;

    loop {
        let (stream, addr) = await server.accept()?;
        println("New connection from " + addr);

        // 为每个连接启动一个异步任务
        spawn(async {
            match await handle_connection(stream) {
                Result::Ok(_) => println("Connection handled successfully"),
                Result::Err(error) => println("Connection error: " + error),
            }
        });
    }
}

async fn main() {
    match await start_server(8080) {
        Result::Ok(_) => println("Server stopped"),
        Result::Err(error) => println("Server error: " + error),
    }
}

异步数据处理管道

struct DataItem {
    id: int,
    content: str,
    timestamp: int,
}

async fn data_source() -> AsyncIterator[DataItem> {
    var id = 1;
    loop {
        await sleep_async(100);  // 模拟数据到达间隔

        yield DataItem {
            id: id,
            content: "Data item " + id,
            timestamp: now(),
        };

        id = id + 1;

        if id > 100 {
            break;
        }
    }
}

async fn process_item(item: DataItem) -> DataItem {
    // 模拟处理时间
    await sleep_async(random_int(50, 200));

    return DataItem {
        id: item.id,
        content: item.content.to_uppercase(),
        timestamp: item.timestamp,
    };
}

async fn save_item(item: DataItem) -> Result<(), str> {
    // 模拟保存到数据库
    await sleep_async(50);
    println("Saved item " + item.id + ": " + item.content);
    return Result::Ok(());
}

async fn data_pipeline() -> Result[(), str> {
    let source = data_source();
    let mut buffer = [>;
    let batch_size = 5;

    await for item in source {
        buffer.push(item);

        if buffer.len() >= batch_size {
            // 批量并发处理
            let processed_tasks = map(buffer, |item| {
                return spawn(process_item(item));
            });

            let processed_items = await join_all(processed_tasks);

            // 批量保存
            let save_tasks = map(processed_items, |item| {
                return spawn(save_item(item));
            });

            let save_results = await join_all(save_tasks);

            // 检查保存结果
            loop result in save_results {
                match result {
                    Result::Ok(_) => {},  // 成功
                    Result::Err(error) => println("Save error: " + error),
                }
            }

            buffer.clear();
        }
    }

    // 处理剩余的数据
    if !buffer.is_empty() {
        let processed_tasks = map(buffer, |item| spawn(process_item(item)));
        let processed_items = await join_all(processed_tasks);
        let save_tasks = map(processed_items, |item| spawn(save_item(item)));
        await join_all(save_tasks);
    }

    return Result::Ok(());
}

async fn main() {
    println("Starting data pipeline...");

    match await data_pipeline() {
        Result::Ok(_) => println("Data pipeline completed successfully"),
        Result::Err(error) => println("Pipeline error: " + error),
    }
}

🎯 最佳实践

异步编程指南

  1. 避免阻塞操作 - 在异步函数中使用异步版本的I/O操作
  2. 合理使用并发 - 不要过度并发,考虑系统资源限制
  3. 错误处理 - 异步操作的错误处理同样重要
  4. 取消支持 - 长时间运行的任务应支持取消
  5. 背压处理 - 在数据流处理中考虑背压问题

性能优化

// ✅ 好的做法 - 批量处理
async fn process_items_batch(items: [Item]) -> [Result] {
    let tasks = map(items, |item| spawn(process_item(item)));
    return await join_all(tasks);
}

// ❌ 避免的做法 - 逐个处理
async fn process_items_sequential(items: [Item]) -> [Result] {
    var results = [];
    loop item in items {
        let result = await process_item(item);
        results.push(result);
    }
    return results;
}

🔗 相关主题

  • 错误处理 - 异步错误处理模式
  • 并发编程 - 线程和并发原语
  • 网络编程 - 网络相关的异步操作
  • 性能优化 - 异步程序的性能优化