리눅스 시스템 응용 설계 term project였던 리눅스 커널 분석이다.
Mutex 코드 분석을 맡았고, 분석했던 걸 정리하였다.
딥한 부분을 다루기보다는 Mutex의 코드 프로세스에 초점을 두었다.
kernel version은 linux 5.4.214이다.
Mutex 개념
정의
- 한 스레드, 프로세스에 의해 소유될 수 있는 key를 기반으로 한 상호배제기법
- 자원에 대한 접근을 동기화하기 위해 사용
- 프로그램이 시작될 때 고유한 이름으로 생성
- Locking 메커니즘
- 오직 하나의 스레드만 동일한 시점에 mutex를 얻어 critical section에 들어올 수 있다. 또한 오직 이 스레드만이 critical section에서 나갈 때 mutex를 해제할 수 있다.
Mutex Busy Waiting?
- CPU가 계속해서 무한 루프를 돌며 확인하는 것이 아니다.
- 어떤 스레드가 공유자원을 획득했을 때 해당 스레드가 공유 자원을 모두 사용할 때까지 다른 스레드가 기다리는 것
- → 쓸데 없이 기다리는 스레드 공유 자원에 접근할 수 있는지 지속적으로 확인할 필요 없이, 알아서 깨워주기 때문에 자원 낭비가 덜하다.
Mutex mechanism 특징
- Atomicity
- mutex lock은 atomic operation 으로 작동
- 하나의 스레드가 mutex를 이용해서 lock을 시도하는 도중에 다른 스레드가 mutex lock을 할 수 없도록 한다.
- 한 번에 하나의 mutex lock을 하도록 보증한다.
- Singularity
- 만약 스레드가 mutex lock을 했다면 lock을 건 스레드가 mutex lock을 해제하기 전까지 어떠한 스레드도 mutex lock을 할 수 없도록 보증한다.
- Non-Busy Wait
- 하나의 스레드가 mutex lock을 시도하는데 이미 다른 스레드 mutex lock을 사용하고 있을 때 이 스레드는 다른 스레드가 lock을 해제하기 전까지 해당 지점에 머물러 있으며, 이 동안은 어떠한 CPU 자원도 소비하지 않는다. (예를 들면 sleep)
예시 - 레스토랑 화장실
- 화장실이 하나 밖에 없는 식당. 화장실을 가기 위해서는 카운터에서 열쇠를 받아 가야 한다.
- 카운터에 키가 있다 → 화장실에 사람이 없다는 뜻. 열쇠를 들고 화장실에 들어갈 수 있다.
- 카운터에 키가 없다 → 화장실을 누군가 사용하고 있다는 뜻. 열쇠가 없으니 화장실에 들어갈 수 없다. 누군가가 나올 때까지 카운터에서 기다려야 한다.
- 누군가가 카운터에 키를 돌려놓았을 경우, 기다리는 사람들은 카운터에 도착한 순서대로 키를 받을 수 있다.
- 화장실을 이용하는 사람 : 프로세스 or 스레드
- 화장실 : 공유자원
- 화장실 키 : 공유자원에 접근하기 위해 필요한 object
→ Mutex는 key에 해당하는 어떤 오브젝트가 있을 때 해당 object를 소유한 스레드/프로세스 만이 공유자원에 접근할 수 있다.
Semaphore
- semaphore의 count를 1로 설정하면 mutex처럼 활용할 수 있다.
Mutex 코드 사용
Mutex 만들기
선언과 초기화를 위해 PTHREAD_MUTEX_INITIALIZER 상수를 할당
Mutex 정보는 pthread_mutex_t 타입에 저장한다.
pthread_mutex_t a_mutex = PTHREAD_MUTEX_INITIALIZER;
Mutex 코드 분석
kernel/locking/mutex.c
해당 파일에 존재하지 않는 코드들도 포함되어 있다. (mutex 코드를 이해하기 위해 필요했던 코드들)
/*
* @owner: contains: 'struct task_struct *' to the current lock owner,
* NULL means not owned. Since task_struct pointers are aligned at
* at least L1_CACHE_BYTES, we have low bits to store extra state.
*
* Bit0 indicates a non-empty waiter list; unlock must issue a wakeup.
* Bit1 indicates unlock needs to hand the lock to the top-waiter
* Bit2 indicates handoff has been done and we're waiting for pickup.
*/
// owner가 NULL인 경우는 mutex가 누구에게도 소유되지 않은 것
// Bit0 : waiter list가 비어있지 않다.
// Bit1 : top-waiter에게 lock을 전달하기 위해 unlock이 필요하다.
// Bit2 : handoff가 마무리 되었고, pickup을 기다리고 있다.
#define MUTEX_FLAG_WAITERS 0x01
#define MUTEX_FLAG_HANDOFF 0x02
#define MUTEX_FLAG_PICKUP 0x04
// Bit를 알아내기 위해 bit & 연산하는 용도
#define MUTEX_FLAGS 0x07
__mutex_init 함수
void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_long_set(&lock->owner, 0); // lock의 owner를 0으로 초기화
spin_lock_init(&lock->wait_lock); // wait_lock (spin lock) 초기화
INIT_LIST_HEAD(&lock->wait_list); // wait_list (리스트) 초기화
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
osq_lock_init(&lock->osq);
#endif
debug_mutex_init(lock, name, key);
}
atomic operation으로 lock의 owner를 0으로 초기화한다.
wait_lock을 초기화한다. wait_lock은 wait_list를 위한 lock
wait_list 를 초기화한다.
INIT_LIST_HEAD
static inline void INIT_LIST_HEAD(struct list_head *list) {
list->next = list;
list->prev = list;
} // 리스트 초기화 함수
수업 시간에 배웠던 리스트 초기화 함수
__mutex_trylock_or_owner
static inline struct task_struct *__mutex_trylock_or_owner(struct mutex *lock)
{
unsigned long owner, curr = (unsigned long)current; // 현재 실행되고 있는 task
owner = atomic_long_read(&lock->owner); // lock의 owner 알아내기
for (;;) { /* must loop, can race against a flag */
unsigned long old, flags = __owner_flags(owner); // owner와 MUTEX_FLAGS & 연산한 후 flag를 가져온다.
unsigned long task = owner & ~MUTEX_FLAGS; //
if (task) { // task가 0b00000xxx가 아닌 경우
if (likely(task != curr)) // 해당 스레드가 owner에 접근한 이후로 owner에 누군가 접근했는지 확인
// 즉 중간에 스레드가 owner 값을 변경하지 않은 경우 break
break;
if (likely(!(flags & MUTEX_FLAG_PICKUP))) // list가 비어있지 않은 경우 break
break;
flags &= ~MUTEX_FLAG_PICKUP;
} else {
#ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(flags & MUTEX_FLAG_PICKUP);
#endif
}
/*
* We set the HANDOFF bit, we must make sure it doesn't live
* past the point where we acquire it. This would be possible
* if we (accidentally) set the bit on an unlocked mutex.
*/
flags &= ~MUTEX_FLAG_HANDOFF;
old = atomic_long_cmpxchg_acquire(&lock->owner, owner, curr | flags);
if (old == owner)
return NULL; // lock 얻는데 성공
owner = old;
}
return __owner_task(owner); // 현재 lock을 갖고 있는 owner 리턴
}
current 변수는 현재 실행되고 있는 task에 접근할 수 있다.
atomic operation으로 lock의 owner를 알아낸다.
owner와 MUTEX_FLAGS를 비트 & 연산한 후 결과값 (task)을 얻는다. task가 특정 값이 아닌 경우 해당 스레드가 owner에 접근한 이후로 아무도 접근하지 않았다면 반복문을 벗어난다.
list가 비어있지 않은 경우에도 break.
atomic_long_cmpxchg_acquire로 lock의 owner와 owner를 비교한 후, 같으면 lock을 얻는데 성공한 것
__mutex_trylock
static inline bool __mutex_trylock(struct mutex *lock)
{
return !__mutex_trylock_or_owner(lock); // mutex_trylock_or_owner가 NULL이면 true
}
__mutex_trylock_fast
경쟁하지 않는 경우에만 작동하면 optimistic trylock 함수
#ifndef CONFIG_DEBUG_LOCK_ALLOC
// 해당 설정이 되어 있을 때
/*
* Optimistic trylock that only works in the uncontended case. Make sure to
* follow with a __mutex_trylock() before failing.
*/
// 경쟁하지 않는 경우에만 작동하는 optimistic trylock
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
unsigned long curr = (unsigned long)current; // 현재 실행중인 task struct
unsigned long zero = 0UL;
if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr)) //
return true;
return false;
}
static __always_inline bool __mutex_unlock_fast(struct mutex *lock)
{
unsigned long curr = (unsigned long)current;
if (atomic_long_cmpxchg_release(&lock->owner, curr, 0UL) == curr) // unlock_fast 성공
return true;
return false;
}
#endif
오직 atomic operation으로만 구성되어 있다.
mutex_waiter 함수들
static inline void __mutex_set_flag(struct mutex *lock, unsigned long flag) // flag와 lock->owner or 연산?
{
atomic_long_or(flag, &lock->owner);
}
static inline void __mutex_clear_flag(struct mutex *lock, unsigned long flag) // flag와 lock->owner andnot 연산
{
atomic_long_andnot(flag, &lock->owner);
}
static inline bool __mutex_waiter_is_first(struct mutex *lock, struct mutex_waiter *waiter)
{
// lock의 wait_list의 first struct가 입력 받은 waiter인지 확인
return list_first_entry(&lock->wait_list, struct mutex_waiter, list) == waiter;
}
/*
* Add @waiter to a given location in the lock wait_list and set the
* FLAG_WAITERS flag if it's the first waiter.
*/
static void
__mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter,
struct list_head *list)
{
debug_mutex_add_waiter(lock, waiter, current); // 현재 task를 lock에 의해 blocked_on 된 것으로 표시
list_add_tail(&waiter->list, list); // list의 끝에 list 추가
if (__mutex_waiter_is_first(lock, waiter))
__mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
}
static void
__mutex_remove_waiter(struct mutex *lock, struct mutex_waiter *waiter)
{
list_del(&waiter->list); // 리스트에서 삭제
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
debug_mutex_remove_waiter(lock, waiter, current);
}
__mutex_add_waiter : waiter_list에 추가할 task를 lock에 의해 blocked_on 된 것으로 표시한다.
waiter->list에 추가하고, 첫 번째 waiter인 경우 flag를 추가한다.
__mutex_remove_waiter : 리스트에서 삭제한다. wait_list가 비었을 경우 flag를 삭제한다.
__mutex_lock, __mutex_lock_common
static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip)
{
return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false); // Call mutex_lock_common from mutex_lock
}
/*
* Lock a mutex (possibly interruptible), slowpath:
*/
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
...
int ret;
...
might_sleep(); // Verify that the preempt_disable function is already invoked to disable preempt (if it is disabled, it causes error message output or kernel panic)
...
preempt_disable(); // Disable preemption
...
if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, NULL)) { // If a process has acquired lock
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip); // A function that occupies lock if there is no thread currently occupying lock; otherwise, it transfers priority to the thread
...
preempt_enable(); // If the lock is obtained, set it again so that it can be preempted.
return 0;
}
// lock을 획득하지 못한 경우
spin_lock(&lock->wait_lock); // waitor list 보호를 위한 wait_lock
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) { // mutex_lock을 얻을 수 있는 경우
...
goto skip_wait;
}
debug_mutex_lock_common(lock, &waiter);
lock_contended(&lock->dep_map, ip); // lock에
...
waiter.task = current; // waiter의 task에 current task 등록
set_current_state(state); // current->state에 입력받은 state 설정
for (;;) { // 무한루프
bool first;
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock)) // mutex lock을 얻을 수 있는 경우
goto acquired;
/*
* Check for signals and kill conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
...
spin_unlock(&lock->wait_lock); // waitor list unlock
schedule_preempt_disabled(); // schedule_preemption disable
first = __mutex_waiter_is_first(lock, &waiter); // mutex waiter 중에서 제일 첫 번째 task인지 확인
if (first) // first인 경우 MUTEX_FLAG_HANDOFF flag marking
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
set_current_state(state); // state 설정
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, &waiter))) // 1. lock을 얻을 수 있거나 2. waitor list의 첫 번째이고 optimistic일 때
break;
// spin lock 다시 실행
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired:
__set_current_state(TASK_RUNNING);
...
__mutex_remove_waiter(lock, &waiter);
debug_mutex_free_waiter(&waiter);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip); // lock 획득
...
spin_unlock(&lock->wait_lock); // waitor list spin_unlock
preempt_enable(); // preemption 가능하도록
return 0;
err:
__set_current_state(TASK_RUNNING);
__mutex_remove_waiter(lock, &waiter);
err_early_kill:
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
발표용 코드라 코드상 생략된 부분이 있다.
__mutex_lock 함수에서 __mutex_lock_common 함수를 호출한다.
__mutex_lock__common
might_sleep() : preempt_disable 함수가 preemption을 막기 위해 불린 상황인지 검증한다. (중복을 막기 위함인 듯)
preempt_disable() : preemption disable
process가 lock을 얻은 경우, 현재 lock을 얻고자하는 스레드가 없을 때 lock을 얻는다. 만약 그렇지 않는다면 스레드에게 priority를 전달한다.
lock을 얻은 후에는 preemption을 다시 enable하게 바꾼다.
lock을 얻지 못한 경우에는 waitor list를 보호하기 위한 wait_lock (spinlock) 을 건다.
만약 wait_lock을 얻기 위해 기다린 후에, __mutex_trylock을 다시 시도한다. (얻을 수 있는 경우 skip_wait으로 이동)
waiter의 task에 current task를 등록하고, current->state 에 입력받은 state를 설정한다.
무한 루프를 돌며 lock 을 얻을 수 있는지 반복적으로 확인한다. mutex lock을 얻을 수 있는 경우에는 acquired로 이동한다.
waitor list의 spinlock을 해제하고, scheduling preemption을 disable 한다. 현재 task가 mutex의 waitor 중에서 가장 첫 번째 task라면 HANDOFF flag를 설정하고, current_state를 현재 state로 변경한다.
다시 lock을 얻을 수 있거나 waitor list의 첫 번째 이고 optimisitc인 경우 반복문을 탈출한다.
lock을 얻지 못했다면 다시 spin_lock을 실행한다.
include/asm-generic/atomic-long.h
static inline bool atomic_long_try_cmpxchg_acquire(atomic_long_t *v, long *old, long new)
{
return atomic_try_cmpxchg_acquire(v, (int*) old, new);
}
atomic_try_cmpxchg_acquire 함수는 컴파일러에 따로 구현되어 있다.
kernel/locking/mutex-debug.c
void debug_mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter, struct task_struct *task)
{
lockdep_assert_held(&lock->wait_lock);
/* Mark the current thread as blocked on the lock: */
task->blocked_on = waiter;
}
현재 thread가 lock에 의해 block 되었다는 것을 표시하는 함수
include/linux/mutex.h
struct mutex {
atomic_long_t owner; // NULL if not owner
spinlock_t wait_lock; // spinlock wait_lock to control access of wait_list
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list; // wait_queue
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
struct mutex {
atomic_long_t owner;
spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
/*
* This is the control structure for tasks blocked on mutex,
* which resides on the blocked task's kernel stack:
*/
// mutex에 의해 block된 task에 대한 control structure
// blocked task의 kernel stack에 존재
struct mutex_waiter {
struct list_head list;
struct task_struct *task; // mutex를 누가 획득했는지?
struct ww_acquire_ctx *ww_ctx;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
};
mutex와 관련된 구조체들이 정의되어 있다.
fastpath, slowpath
- fastpath
뮤텍스는 다른 프로세스가 이미 획득하지 않은 상태면 바로 획득할 수 있습니다. 이 경우 fastpath 로 빠르게 뮤텍스를 획득하고 해제합니다.
- slowpath
fastpath 코드 흐름으로 뮤텍스 획득을 시도했는데 다른 프로세스가 이미 뮤텍스를 획득한 경우 실행하는 동작
slowpath 동작은 크게 다음과 같이 나눌 수 있습니다.
- 뮤텍스를 획득하지 못한 프로세스는 대기열에 자신을 등록하고 휴면에 들어감
- 뮤텍스를 해제한 프로세스는 뮤텍스 대기열에 등록(뮤텍스 획득을 이미 시도)한 다른 프로세스를 깨움
'School Lecture Study > Linux System Application Design' 카테고리의 다른 글
9. Memory Management (0) | 2022.12.20 |
---|---|
8. Linux Kernel Data Structure (0) | 2022.12.20 |
7-4. Task Scheduling in Linux (4) (0) | 2022.12.20 |
7-3. Task Scheduling in Linux (3) (0) | 2022.12.20 |
7-2. Task Scheduling in Linux (2) (0) | 2022.12.20 |