Implement mpsc using semaphores.
This commit is contained in:
parent
9bc1e8d2e9
commit
45c33f2ce2
8 changed files with 187 additions and 2 deletions
|
@ -1,5 +1,7 @@
|
||||||
mod up;
|
mod up;
|
||||||
mod mutex;
|
mod mutex;
|
||||||
|
mod semaphore;
|
||||||
|
|
||||||
pub use up::UPSafeCell;
|
pub use up::UPSafeCell;
|
||||||
pub use mutex::{Mutex, MutexSpin, MutexBlocking};
|
pub use mutex::{Mutex, MutexSpin, MutexBlocking};
|
||||||
|
pub use semaphore::Semaphore;
|
||||||
|
|
45
os/src/sync/semaphore.rs
Normal file
45
os/src/sync/semaphore.rs
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
use alloc::{sync::Arc, collections::VecDeque};
|
||||||
|
use crate::task::{add_task, TaskControlBlock, current_task, block_current_and_run_next};
|
||||||
|
use crate::sync::UPSafeCell;
|
||||||
|
|
||||||
|
pub struct Semaphore {
|
||||||
|
pub inner: UPSafeCell<SemaphoreInner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SemaphoreInner {
|
||||||
|
pub count: isize,
|
||||||
|
pub wait_queue: VecDeque<Arc<TaskControlBlock>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Semaphore {
|
||||||
|
pub fn new(res_count: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: unsafe { UPSafeCell::new(
|
||||||
|
SemaphoreInner {
|
||||||
|
count: res_count as isize,
|
||||||
|
wait_queue: VecDeque::new(),
|
||||||
|
}
|
||||||
|
)},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn up(&self) {
|
||||||
|
let mut inner = self.inner.exclusive_access();
|
||||||
|
inner.count += 1;
|
||||||
|
if inner.count <= 0 {
|
||||||
|
if let Some(task) = inner.wait_queue.pop_front() {
|
||||||
|
add_task(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn down(&self) {
|
||||||
|
let mut inner = self.inner.exclusive_access();
|
||||||
|
inner.count -= 1;
|
||||||
|
if inner.count < 0 {
|
||||||
|
inner.wait_queue.push_back(current_task().unwrap());
|
||||||
|
drop(inner);
|
||||||
|
block_current_and_run_next();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,6 +18,9 @@ const SYSCALL_WAITTID: usize = 1002;
|
||||||
const SYSCALL_MUTEX_CREATE: usize = 1010;
|
const SYSCALL_MUTEX_CREATE: usize = 1010;
|
||||||
const SYSCALL_MUTEX_LOCK: usize = 1011;
|
const SYSCALL_MUTEX_LOCK: usize = 1011;
|
||||||
const SYSCALL_MUTEX_UNLOCK: usize = 1012;
|
const SYSCALL_MUTEX_UNLOCK: usize = 1012;
|
||||||
|
const SYSCALL_SEMAPHORE_CREATE: usize = 1020;
|
||||||
|
const SYSCALL_SEMAPHORE_UP: usize = 1021;
|
||||||
|
const SYSCALL_SEMAPHORE_DOWN: usize = 1022;
|
||||||
|
|
||||||
mod fs;
|
mod fs;
|
||||||
mod process;
|
mod process;
|
||||||
|
@ -51,6 +54,9 @@ pub fn syscall(syscall_id: usize, args: [usize; 3]) -> isize {
|
||||||
SYSCALL_MUTEX_CREATE => sys_mutex_create(args[0] == 1),
|
SYSCALL_MUTEX_CREATE => sys_mutex_create(args[0] == 1),
|
||||||
SYSCALL_MUTEX_LOCK => sys_mutex_lock(args[0]),
|
SYSCALL_MUTEX_LOCK => sys_mutex_lock(args[0]),
|
||||||
SYSCALL_MUTEX_UNLOCK => sys_mutex_unlock(args[0]),
|
SYSCALL_MUTEX_UNLOCK => sys_mutex_unlock(args[0]),
|
||||||
|
SYSCALL_SEMAPHORE_CREATE => sys_semaphore_creare(args[0]),
|
||||||
|
SYSCALL_SEMAPHORE_UP => sys_semaphore_up(args[0]),
|
||||||
|
SYSCALL_SEMAPHORE_DOWN => sys_semaphore_down(args[0]),
|
||||||
_ => panic!("Unsupported syscall_id: {}", syscall_id),
|
_ => panic!("Unsupported syscall_id: {}", syscall_id),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use crate::task::{current_task, current_process, block_current_and_run_next};
|
use crate::task::{current_task, current_process, block_current_and_run_next};
|
||||||
use crate::sync::{MutexSpin, MutexBlocking};
|
use crate::sync::{MutexSpin, MutexBlocking, Semaphore};
|
||||||
use crate::timer::{get_time_ms, add_timer};
|
use crate::timer::{get_time_ms, add_timer};
|
||||||
use alloc::sync::Arc;
|
use alloc::sync::Arc;
|
||||||
|
|
||||||
|
@ -51,3 +51,39 @@ pub fn sys_mutex_unlock(mutex_id: usize) -> isize {
|
||||||
mutex.unlock();
|
mutex.unlock();
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn sys_semaphore_creare(res_count: usize) -> isize {
|
||||||
|
let process = current_process();
|
||||||
|
let mut process_inner = process.inner_exclusive_access();
|
||||||
|
let id = if let Some(id) = process_inner
|
||||||
|
.semaphore_list
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.find(|(_, item)| item.is_none())
|
||||||
|
.map(|(id, _)| id) {
|
||||||
|
process_inner.semaphore_list[id] = Some(Arc::new(Semaphore::new(res_count)));
|
||||||
|
id
|
||||||
|
} else {
|
||||||
|
process_inner.semaphore_list.push(Some(Arc::new(Semaphore::new(res_count))));
|
||||||
|
process_inner.semaphore_list.len() - 1
|
||||||
|
};
|
||||||
|
id as isize
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sys_semaphore_up(sem_id: usize) -> isize {
|
||||||
|
let process = current_process();
|
||||||
|
let process_inner = process.inner_exclusive_access();
|
||||||
|
let sem = Arc::clone(process_inner.semaphore_list[sem_id].as_ref().unwrap());
|
||||||
|
drop(process_inner);
|
||||||
|
sem.up();
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sys_semaphore_down(sem_id: usize) -> isize {
|
||||||
|
let process = current_process();
|
||||||
|
let process_inner = process.inner_exclusive_access();
|
||||||
|
let sem = Arc::clone(process_inner.semaphore_list[sem_id].as_ref().unwrap());
|
||||||
|
drop(process_inner);
|
||||||
|
sem.down();
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@ use crate::mm::{
|
||||||
translated_refmut,
|
translated_refmut,
|
||||||
};
|
};
|
||||||
use crate::trap::{TrapContext, trap_handler};
|
use crate::trap::{TrapContext, trap_handler};
|
||||||
use crate::sync::{UPSafeCell, Mutex};
|
use crate::sync::{UPSafeCell, Mutex, Semaphore};
|
||||||
use core::cell::RefMut;
|
use core::cell::RefMut;
|
||||||
use super::id::RecycleAllocator;
|
use super::id::RecycleAllocator;
|
||||||
use super::TaskControlBlock;
|
use super::TaskControlBlock;
|
||||||
|
@ -33,6 +33,7 @@ pub struct ProcessControlBlockInner {
|
||||||
pub tasks: Vec<Option<Arc<TaskControlBlock>>>,
|
pub tasks: Vec<Option<Arc<TaskControlBlock>>>,
|
||||||
pub task_res_allocator: RecycleAllocator,
|
pub task_res_allocator: RecycleAllocator,
|
||||||
pub mutex_list: Vec<Option<Arc<dyn Mutex>>>,
|
pub mutex_list: Vec<Option<Arc<dyn Mutex>>>,
|
||||||
|
pub semaphore_list: Vec<Option<Arc<Semaphore>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ProcessControlBlockInner {
|
impl ProcessControlBlockInner {
|
||||||
|
@ -97,6 +98,7 @@ impl ProcessControlBlock {
|
||||||
tasks: Vec::new(),
|
tasks: Vec::new(),
|
||||||
task_res_allocator: RecycleAllocator::new(),
|
task_res_allocator: RecycleAllocator::new(),
|
||||||
mutex_list: Vec::new(),
|
mutex_list: Vec::new(),
|
||||||
|
semaphore_list: Vec::new(),
|
||||||
})}
|
})}
|
||||||
});
|
});
|
||||||
// create a main thread, we should allocate ustack and trap_cx here
|
// create a main thread, we should allocate ustack and trap_cx here
|
||||||
|
@ -210,6 +212,7 @@ impl ProcessControlBlock {
|
||||||
tasks: Vec::new(),
|
tasks: Vec::new(),
|
||||||
task_res_allocator: RecycleAllocator::new(),
|
task_res_allocator: RecycleAllocator::new(),
|
||||||
mutex_list: Vec::new(),
|
mutex_list: Vec::new(),
|
||||||
|
semaphore_list: Vec::new(),
|
||||||
})}
|
})}
|
||||||
});
|
});
|
||||||
// add child
|
// add child
|
||||||
|
|
69
user/src/bin/mpsc_sem.rs
Normal file
69
user/src/bin/mpsc_sem.rs
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
#![no_std]
|
||||||
|
#![no_main]
|
||||||
|
|
||||||
|
#[macro_use]
|
||||||
|
extern crate user_lib;
|
||||||
|
|
||||||
|
extern crate alloc;
|
||||||
|
|
||||||
|
use user_lib::{semaphore_create, semaphore_up, semaphore_down};
|
||||||
|
use user_lib::{thread_create, waittid};
|
||||||
|
use user_lib::exit;
|
||||||
|
use alloc::vec::Vec;
|
||||||
|
|
||||||
|
const SEM_MUTEX: usize = 0;
|
||||||
|
const SEM_EMPTY: usize = 1;
|
||||||
|
const SEM_EXISTED: usize = 2;
|
||||||
|
const BUFFER_SIZE: usize = 8;
|
||||||
|
static mut BUFFER: [usize; BUFFER_SIZE] = [0; BUFFER_SIZE];
|
||||||
|
static mut FRONT: usize = 0;
|
||||||
|
static mut TAIL: usize = 0;
|
||||||
|
const PRODUCER_COUNT: usize = 4;
|
||||||
|
const NUMBER_PER_PRODUCER: usize = 100;
|
||||||
|
|
||||||
|
unsafe fn producer(id: *const usize) -> ! {
|
||||||
|
let id = *id;
|
||||||
|
for _ in 0..NUMBER_PER_PRODUCER {
|
||||||
|
semaphore_down(SEM_EMPTY);
|
||||||
|
semaphore_down(SEM_MUTEX);
|
||||||
|
BUFFER[FRONT] = id;
|
||||||
|
FRONT = (FRONT + 1) % BUFFER_SIZE;
|
||||||
|
semaphore_up(SEM_MUTEX);
|
||||||
|
semaphore_up(SEM_EXISTED);
|
||||||
|
}
|
||||||
|
exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn consumer() -> ! {
|
||||||
|
for _ in 0..PRODUCER_COUNT * NUMBER_PER_PRODUCER {
|
||||||
|
semaphore_down(SEM_EXISTED);
|
||||||
|
semaphore_down(SEM_MUTEX);
|
||||||
|
print!("{} ", BUFFER[TAIL]);
|
||||||
|
TAIL = (TAIL + 1) % BUFFER_SIZE;
|
||||||
|
semaphore_up(SEM_MUTEX);
|
||||||
|
semaphore_up(SEM_EMPTY);
|
||||||
|
}
|
||||||
|
println!("");
|
||||||
|
exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub fn main() -> i32 {
|
||||||
|
// create semaphores
|
||||||
|
assert_eq!(semaphore_create(1) as usize, SEM_MUTEX);
|
||||||
|
assert_eq!(semaphore_create(BUFFER_SIZE) as usize, SEM_EMPTY);
|
||||||
|
assert_eq!(semaphore_create(0) as usize, SEM_EXISTED);
|
||||||
|
// create threads
|
||||||
|
let ids: Vec<_> = (0..PRODUCER_COUNT).collect();
|
||||||
|
let mut threads = Vec::new();
|
||||||
|
for i in 0..PRODUCER_COUNT {
|
||||||
|
threads.push(thread_create(producer as usize, &ids.as_slice()[i] as *const _ as usize));
|
||||||
|
}
|
||||||
|
threads.push(thread_create(consumer as usize, 0));
|
||||||
|
// wait for all threads to complete
|
||||||
|
for thread in threads.iter() {
|
||||||
|
waittid(*thread as usize);
|
||||||
|
}
|
||||||
|
println!("mpsc_sem passed!");
|
||||||
|
0
|
||||||
|
}
|
|
@ -119,4 +119,13 @@ pub fn mutex_create() -> isize { sys_mutex_create(false) }
|
||||||
pub fn mutex_blocking_create() -> isize { sys_mutex_create(true) }
|
pub fn mutex_blocking_create() -> isize { sys_mutex_create(true) }
|
||||||
pub fn mutex_lock(mutex_id: usize) { sys_mutex_lock(mutex_id); }
|
pub fn mutex_lock(mutex_id: usize) { sys_mutex_lock(mutex_id); }
|
||||||
pub fn mutex_unlock(mutex_id: usize) { sys_mutex_unlock(mutex_id); }
|
pub fn mutex_unlock(mutex_id: usize) { sys_mutex_unlock(mutex_id); }
|
||||||
|
pub fn semaphore_create(res_count: usize) -> isize {
|
||||||
|
sys_semaphore_create(res_count)
|
||||||
|
}
|
||||||
|
pub fn semaphore_up(sem_id: usize) {
|
||||||
|
sys_semaphore_up(sem_id);
|
||||||
|
}
|
||||||
|
pub fn semaphore_down(sem_id: usize) {
|
||||||
|
sys_semaphore_down(sem_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,9 @@ const SYSCALL_WAITTID: usize = 1002;
|
||||||
const SYSCALL_MUTEX_CREATE: usize = 1010;
|
const SYSCALL_MUTEX_CREATE: usize = 1010;
|
||||||
const SYSCALL_MUTEX_LOCK: usize = 1011;
|
const SYSCALL_MUTEX_LOCK: usize = 1011;
|
||||||
const SYSCALL_MUTEX_UNLOCK: usize = 1012;
|
const SYSCALL_MUTEX_UNLOCK: usize = 1012;
|
||||||
|
const SYSCALL_SEMAPHORE_CREATE: usize = 1020;
|
||||||
|
const SYSCALL_SEMAPHORE_UP: usize = 1021;
|
||||||
|
const SYSCALL_SEMAPHORE_DOWN: usize = 1022;
|
||||||
|
|
||||||
fn syscall(id: usize, args: [usize; 3]) -> isize {
|
fn syscall(id: usize, args: [usize; 3]) -> isize {
|
||||||
let mut ret: isize;
|
let mut ret: isize;
|
||||||
|
@ -113,3 +116,15 @@ pub fn sys_mutex_lock(id: usize) -> isize {
|
||||||
pub fn sys_mutex_unlock(id: usize) -> isize {
|
pub fn sys_mutex_unlock(id: usize) -> isize {
|
||||||
syscall(SYSCALL_MUTEX_UNLOCK, [id, 0, 0])
|
syscall(SYSCALL_MUTEX_UNLOCK, [id, 0, 0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn sys_semaphore_create(res_count: usize) -> isize {
|
||||||
|
syscall(SYSCALL_SEMAPHORE_CREATE, [res_count, 0, 0])
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sys_semaphore_up(sem_id: usize) -> isize {
|
||||||
|
syscall(SYSCALL_SEMAPHORE_UP, [sem_id, 0, 0])
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sys_semaphore_down(sem_id: usize) -> isize {
|
||||||
|
syscall(SYSCALL_SEMAPHORE_DOWN, [sem_id, 0, 0])
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue