School Lecture Study/Linux System Application Design

리눅스 커널 분석 - Mutex

vㅔ로 2023. 1. 27. 17:25
728x90

리눅스 시스템 응용 설계 term project였던 리눅스 커널 분석이다.

Mutex 코드 분석을 맡았고, 분석했던 걸 정리하였다.

딥한 부분을 다루기보다는 Mutex의 코드 프로세스에 초점을 두었다.

kernel version은 linux 5.4.214이다.

Mutex 개념

정의

  • 한 스레드, 프로세스에 의해 소유될 수 있는 key를 기반으로 한 상호배제기법
  • 자원에 대한 접근을 동기화하기 위해 사용
  • 프로그램이 시작될 때 고유한 이름으로 생성
  • Locking 메커니즘
  • 오직 하나의 스레드만 동일한 시점에 mutex를 얻어 critical section에 들어올 수 있다. 또한 오직 이 스레드만이 critical section에서 나갈 때 mutex를 해제할 수 있다.

Mutex Busy Waiting?

  • CPU가 계속해서 무한 루프를 돌며 확인하는 것이 아니다.
  • 어떤 스레드가 공유자원을 획득했을 때 해당 스레드가 공유 자원을 모두 사용할 때까지 다른 스레드가 기다리는 것
  • → 쓸데 없이 기다리는 스레드 공유 자원에 접근할 수 있는지 지속적으로 확인할 필요 없이, 알아서 깨워주기 때문에 자원 낭비가 덜하다.

Mutex mechanism 특징

  • Atomicity
    • mutex lock은 atomic operation 으로 작동
    • 하나의 스레드가 mutex를 이용해서 lock을 시도하는 도중에 다른 스레드가 mutex lock을 할 수 없도록 한다.
    • 한 번에 하나의 mutex lock을 하도록 보증한다.
  • Singularity
    • 만약 스레드가 mutex lock을 했다면 lock을 건 스레드가 mutex lock을 해제하기 전까지 어떠한 스레드도 mutex lock을 할 수 없도록 보증한다.
  • Non-Busy Wait
    • 하나의 스레드가 mutex lock을 시도하는데 이미 다른 스레드 mutex lock을 사용하고 있을 때 이 스레드는 다른 스레드가 lock을 해제하기 전까지 해당 지점에 머물러 있으며, 이 동안은 어떠한 CPU 자원도 소비하지 않는다. (예를 들면 sleep)

예시 - 레스토랑 화장실

  • 화장실이 하나 밖에 없는 식당. 화장실을 가기 위해서는 카운터에서 열쇠를 받아 가야 한다.
  • 카운터에 키가 있다 → 화장실에 사람이 없다는 뜻. 열쇠를 들고 화장실에 들어갈 수 있다.
  • 카운터에 키가 없다 → 화장실을 누군가 사용하고 있다는 뜻. 열쇠가 없으니 화장실에 들어갈 수 없다. 누군가가 나올 때까지 카운터에서 기다려야 한다.
    • 누군가가 카운터에 키를 돌려놓았을 경우, 기다리는 사람들은 카운터에 도착한 순서대로 키를 받을 수 있다.
  • 화장실을 이용하는 사람 : 프로세스 or 스레드
  • 화장실 : 공유자원
  • 화장실 키 : 공유자원에 접근하기 위해 필요한 object

→ Mutex는 key에 해당하는 어떤 오브젝트가 있을 때 해당 object를 소유한 스레드/프로세스 만이 공유자원에 접근할 수 있다.

Semaphore

  • semaphore의 count를 1로 설정하면 mutex처럼 활용할 수 있다.

Mutex 코드 사용

Mutex 만들기

선언과 초기화를 위해 PTHREAD_MUTEX_INITIALIZER 상수를 할당

Mutex 정보는 pthread_mutex_t 타입에 저장한다.

pthread_mutex_t a_mutex = PTHREAD_MUTEX_INITIALIZER;

Mutex 코드 분석

kernel/locking/mutex.c

해당 파일에 존재하지 않는 코드들도 포함되어 있다. (mutex 코드를 이해하기 위해 필요했던 코드들)

/*
 * @owner: contains: 'struct task_struct *' to the current lock owner,
 * NULL means not owned. Since task_struct pointers are aligned at
 * at least L1_CACHE_BYTES, we have low bits to store extra state.
 *
 * Bit0 indicates a non-empty waiter list; unlock must issue a wakeup.
 * Bit1 indicates unlock needs to hand the lock to the top-waiter
 * Bit2 indicates handoff has been done and we're waiting for pickup.
 */

// owner가 NULL인 경우는 mutex가 누구에게도 소유되지 않은 것
// Bit0 : waiter list가 비어있지 않다.
// Bit1 : top-waiter에게 lock을 전달하기 위해 unlock이 필요하다.
// Bit2 : handoff가 마무리 되었고, pickup을 기다리고 있다.
#define MUTEX_FLAG_WAITERS	0x01
#define MUTEX_FLAG_HANDOFF	0x02
#define MUTEX_FLAG_PICKUP	0x04

// Bit를 알아내기 위해 bit & 연산하는 용도
#define MUTEX_FLAGS		0x07

__mutex_init 함수

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
	atomic_long_set(&lock->owner, 0);  // lock의 owner를 0으로 초기화
	spin_lock_init(&lock->wait_lock);  // wait_lock (spin lock) 초기화
	INIT_LIST_HEAD(&lock->wait_list);  // wait_list (리스트) 초기화
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
	osq_lock_init(&lock->osq);
#endif

	debug_mutex_init(lock, name, key);
}

atomic operation으로 lock의 owner를 0으로 초기화한다.

wait_lock을 초기화한다. wait_lock은 wait_list를 위한 lock

wait_list 를 초기화한다. 

INIT_LIST_HEAD

static inline void INIT_LIST_HEAD(struct list_head *list) {
	list->next = list;
	list->prev = list;
}  // 리스트 초기화 함수

수업 시간에 배웠던 리스트 초기화 함수

__mutex_trylock_or_owner

static inline struct task_struct *__mutex_trylock_or_owner(struct mutex *lock)
{
	unsigned long owner, curr = (unsigned long)current;  // 현재 실행되고 있는 task

	owner = atomic_long_read(&lock->owner);  // lock의 owner 알아내기
	for (;;) { /* must loop, can race against a flag */
		unsigned long old, flags = __owner_flags(owner);  // owner와 MUTEX_FLAGS & 연산한 후 flag를 가져온다.
		unsigned long task = owner & ~MUTEX_FLAGS;  // 

		if (task) {  // task가 0b00000xxx가 아닌 경우
			if (likely(task != curr))  // 해당 스레드가 owner에 접근한 이후로 owner에 누군가 접근했는지 확인
// 즉 중간에 스레드가 owner 값을 변경하지 않은 경우 break
				break;

			if (likely(!(flags & MUTEX_FLAG_PICKUP))) // list가 비어있지 않은 경우 break
				break;

			flags &= ~MUTEX_FLAG_PICKUP;
		} else {
#ifdef CONFIG_DEBUG_MUTEXES
			DEBUG_LOCKS_WARN_ON(flags & MUTEX_FLAG_PICKUP);
#endif
		}

		/*
		 * We set the HANDOFF bit, we must make sure it doesn't live
		 * past the point where we acquire it. This would be possible
		 * if we (accidentally) set the bit on an unlocked mutex.
		 */
		flags &= ~MUTEX_FLAG_HANDOFF;

		old = atomic_long_cmpxchg_acquire(&lock->owner, owner, curr | flags);
		if (old == owner)
			return NULL;  // lock 얻는데 성공

		owner = old;
	}

	return __owner_task(owner);  // 현재 lock을 갖고 있는 owner 리턴
}

current 변수는 현재 실행되고 있는 task에 접근할 수 있다.

atomic operation으로 lock의 owner를 알아낸다.

owner와 MUTEX_FLAGS를 비트 & 연산한 후 결과값 (task)을 얻는다. task가 특정 값이 아닌 경우 해당 스레드가 owner에 접근한 이후로 아무도 접근하지 않았다면 반복문을 벗어난다.

list가 비어있지 않은 경우에도 break.

atomic_long_cmpxchg_acquire로 lock의 owner와 owner를 비교한 후, 같으면 lock을 얻는데 성공한 것

__mutex_trylock

static inline bool __mutex_trylock(struct mutex *lock)
{
	return !__mutex_trylock_or_owner(lock);  // mutex_trylock_or_owner가 NULL이면 true
}

__mutex_trylock_fast

경쟁하지 않는 경우에만 작동하면 optimistic trylock 함수

#ifndef CONFIG_DEBUG_LOCK_ALLOC
// 해당 설정이 되어 있을 때
/*
 * Optimistic trylock that only works in the uncontended case. Make sure to
 * follow with a __mutex_trylock() before failing.
 */
// 경쟁하지 않는 경우에만 작동하는 optimistic trylock
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
	unsigned long curr = (unsigned long)current;  // 현재 실행중인 task struct
	unsigned long zero = 0UL;

	if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))   // 
		return true;

	return false;
}
static __always_inline bool __mutex_unlock_fast(struct mutex *lock)
{
	unsigned long curr = (unsigned long)current;

	if (atomic_long_cmpxchg_release(&lock->owner, curr, 0UL) == curr)  // unlock_fast 성공
		return true;

	return false;
}
#endif

오직 atomic operation으로만 구성되어 있다.

mutex_waiter 함수들

static inline void __mutex_set_flag(struct mutex *lock, unsigned long flag)   // flag와 lock->owner or 연산?
{
	atomic_long_or(flag, &lock->owner);
}

static inline void __mutex_clear_flag(struct mutex *lock, unsigned long flag)   // flag와 lock->owner andnot 연산
{
	atomic_long_andnot(flag, &lock->owner);
}

static inline bool __mutex_waiter_is_first(struct mutex *lock, struct mutex_waiter *waiter)
{
// lock의 wait_list의 first struct가 입력 받은 waiter인지 확인
	return list_first_entry(&lock->wait_list, struct mutex_waiter, list) == waiter;
}

/*
 * Add @waiter to a given location in the lock wait_list and set the
 * FLAG_WAITERS flag if it's the first waiter.
 */
static void
__mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter,
		   struct list_head *list)
{
	debug_mutex_add_waiter(lock, waiter, current);  // 현재 task를 lock에 의해 blocked_on 된 것으로 표시

	list_add_tail(&waiter->list, list);   // list의 끝에 list 추가
	if (__mutex_waiter_is_first(lock, waiter))
		__mutex_set_flag(lock, MUTEX_FLAG_WAITERS);
}

static void
__mutex_remove_waiter(struct mutex *lock, struct mutex_waiter *waiter)
{
	list_del(&waiter->list);  // 리스트에서 삭제
	if (likely(list_empty(&lock->wait_list)))
		__mutex_clear_flag(lock, MUTEX_FLAGS);

	debug_mutex_remove_waiter(lock, waiter, current);  
}

__mutex_add_waiter : waiter_list에 추가할 task를 lock에 의해 blocked_on 된 것으로 표시한다.

waiter->list에 추가하고, 첫 번째 waiter인 경우 flag를 추가한다.

__mutex_remove_waiter : 리스트에서 삭제한다. wait_list가 비었을 경우 flag를 삭제한다.

__mutex_lock, __mutex_lock_common

static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
	     struct lockdep_map *nest_lock, unsigned long ip)
{
	return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);  // Call mutex_lock_common from mutex_lock
}

/*
 * Lock a mutex (possibly interruptible), slowpath:
 */
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
		    struct lockdep_map *nest_lock, unsigned long ip,
		    struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
	struct mutex_waiter waiter;
	...
	int ret;
	...
	might_sleep();  // Verify that the preempt_disable function is already invoked to disable preempt (if it is disabled, it causes error message output or kernel panic)

	...

	preempt_disable();   // Disable preemption
	...

	if (__mutex_trylock(lock) ||
	    mutex_optimistic_spin(lock, ww_ctx, NULL)) {   // If a process has acquired lock
		/* got the lock, yay! */
		lock_acquired(&lock->dep_map, ip);  // A function that occupies lock if there is no thread currently occupying lock; otherwise, it transfers priority to the thread
		...
		preempt_enable();  // If the lock is obtained, set it again so that it can be preempted.
		return 0;
	}

	// lock을 획득하지 못한 경우
	spin_lock(&lock->wait_lock);   // waitor list 보호를 위한 wait_lock

	/*
	 * After waiting to acquire the wait_lock, try again.
	 */
	if (__mutex_trylock(lock)) {  // mutex_lock을 얻을 수 있는 경우
		...
		goto skip_wait;
	}

	debug_mutex_lock_common(lock, &waiter);

	lock_contended(&lock->dep_map, ip);  // lock에 

	...

	waiter.task = current;   // waiter의 task에 current task 등록

	set_current_state(state);   // current->state에 입력받은 state 설정
	for (;;) {  // 무한루프
		bool first;

		/*
		 * Once we hold wait_lock, we're serialized against
		 * mutex_unlock() handing the lock off to us, do a trylock
		 * before testing the error conditions to make sure we pick up
		 * the handoff.
		 */
		if (__mutex_trylock(lock))   // mutex lock을 얻을 수 있는 경우
			goto acquired;

		/*
		 * Check for signals and kill conditions while holding
		 * wait_lock. This ensures the lock cancellation is ordered
		 * against mutex_unlock() and wake-ups do not go missing.
		 */
		...

		spin_unlock(&lock->wait_lock);  // waitor list unlock
		schedule_preempt_disabled();  // schedule_preemption disable

		first = __mutex_waiter_is_first(lock, &waiter);  // mutex waiter 중에서 제일 첫 번째 task인지 확인
		if (first)   // first인 경우 MUTEX_FLAG_HANDOFF flag marking
			__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);

		set_current_state(state);  // state 설정
		/*
		 * Here we order against unlock; we must either see it change
		 * state back to RUNNING and fall through the next schedule(),
		 * or we must see its unlock and acquire.
		 */
		if (__mutex_trylock(lock) ||
		    (first && mutex_optimistic_spin(lock, ww_ctx, &waiter)))  // 1. lock을 얻을 수 있거나 2. waitor list의 첫 번째이고 optimistic일 때
			break;

		// spin lock 다시 실행
		spin_lock(&lock->wait_lock);
	}
	spin_lock(&lock->wait_lock);

acquired:
	__set_current_state(TASK_RUNNING);

	...

	__mutex_remove_waiter(lock, &waiter);

	debug_mutex_free_waiter(&waiter);

skip_wait:
	/* got the lock - cleanup and rejoice! */
	lock_acquired(&lock->dep_map, ip);  // lock 획득

	...

	spin_unlock(&lock->wait_lock);  // waitor list spin_unlock
	preempt_enable();  // preemption 가능하도록
	return 0;

err:
	__set_current_state(TASK_RUNNING);
	__mutex_remove_waiter(lock, &waiter);
err_early_kill:
	spin_unlock(&lock->wait_lock);
	debug_mutex_free_waiter(&waiter);
	mutex_release(&lock->dep_map, 1, ip);
	preempt_enable();
	return ret;
}

발표용 코드라 코드상 생략된 부분이 있다.

__mutex_lock 함수에서 __mutex_lock_common 함수를 호출한다.

 

__mutex_lock__common

might_sleep() : preempt_disable 함수가 preemption을 막기 위해 불린 상황인지 검증한다. (중복을 막기 위함인 듯)

preempt_disable() : preemption disable

process가 lock을 얻은 경우, 현재 lock을 얻고자하는 스레드가 없을 때 lock을 얻는다. 만약 그렇지 않는다면 스레드에게 priority를 전달한다.

lock을 얻은 후에는 preemption을 다시 enable하게 바꾼다.

lock을 얻지 못한 경우에는 waitor list를 보호하기 위한 wait_lock (spinlock) 을 건다.

만약 wait_lock을 얻기 위해 기다린 후에, __mutex_trylock을 다시 시도한다. (얻을 수 있는 경우 skip_wait으로 이동)

waiter의 task에 current task를 등록하고, current->state 에 입력받은 state를 설정한다. 

무한 루프를 돌며 lock 을 얻을 수 있는지 반복적으로 확인한다. mutex lock을 얻을 수 있는 경우에는 acquired로 이동한다.

waitor list의 spinlock을 해제하고, scheduling preemption을 disable 한다. 현재 task가 mutex의 waitor 중에서 가장 첫 번째 task라면 HANDOFF flag를 설정하고, current_state를 현재 state로 변경한다. 

다시 lock을 얻을 수 있거나 waitor list의 첫 번째 이고 optimisitc인 경우 반복문을 탈출한다.

lock을 얻지 못했다면 다시 spin_lock을 실행한다.

include/asm-generic/atomic-long.h

static inline bool atomic_long_try_cmpxchg_acquire(atomic_long_t *v, long *old, long new) 
{
	return atomic_try_cmpxchg_acquire(v, (int*) old, new);
}

atomic_try_cmpxchg_acquire 함수는 컴파일러에 따로 구현되어 있다.

kernel/locking/mutex-debug.c

void debug_mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter, struct task_struct *task)
{
        lockdep_assert_held(&lock->wait_lock);

        /* Mark the current thread as blocked on the lock: */
        task->blocked_on = waiter;
}

현재 thread가 lock에 의해 block 되었다는 것을 표시하는 함수

include/linux/mutex.h

struct mutex {
	atomic_long_t		owner;   // NULL if not owner
	spinlock_t		wait_lock;  // spinlock wait_lock to control access of wait_list
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
	struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
	struct list_head	wait_list;  // wait_queue
#ifdef CONFIG_DEBUG_MUTEXES
	void			*magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
	struct lockdep_map	dep_map;
#endif
};
struct mutex {
	atomic_long_t		owner;
	spinlock_t		wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
	struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
	struct list_head	wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
	void			*magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
	struct lockdep_map	dep_map;
#endif
};
/*
 * This is the control structure for tasks blocked on mutex,
 * which resides on the blocked task's kernel stack:
 */
// mutex에 의해 block된 task에 대한 control structure
// blocked task의 kernel stack에 존재
struct mutex_waiter {
	struct list_head	list;
	struct task_struct	*task;  // mutex를 누가 획득했는지?
	struct ww_acquire_ctx	*ww_ctx;
#ifdef CONFIG_DEBUG_MUTEXES
	void			*magic;
#endif
};

mutex와 관련된 구조체들이 정의되어 있다.

fastpath, slowpath

  • fastpath

뮤텍스는 다른 프로세스가 이미 획득하지 않은 상태면 바로 획득할 수 있습니다. 이 경우 fastpath 로 빠르게 뮤텍스를 획득하고 해제합니다.

  • slowpath

fastpath 코드 흐름으로 뮤텍스 획득을 시도했는데 다른 프로세스가 이미 뮤텍스를 획득한 경우 실행하는 동작

slowpath 동작은 크게 다음과 같이 나눌 수 있습니다.

  1. 뮤텍스를 획득하지 못한 프로세스는 대기열에 자신을 등록하고 휴면에 들어감
  2. 뮤텍스를 해제한 프로세스는 뮤텍스 대기열에 등록(뮤텍스 획득을 이미 시도)한 다른 프로세스를 깨움
728x90