0. 시작하며
https://nooroongzi.tistory.com/12
[SpringBoot] SSE알림과 비동기
0. 시작하며 프로젝트에서 SSE(Server Sent Event) 방식의 알림을 구현을 담당했습니다. 이때발생한 에러와 트러블 슈팅을 기록하려합니다. 1. 알림 구현 방식 후보군 1-1 Short Polling 클라이언트가 서버
nooroongzi.tistory.com
위 글에서 이어지는 글 입니다. SpringBoot에서의 비동기처리를할때 많이 고려되는 ConcurrentHashMap에대하여 알아보겠습니다.
1. ConcurrentHashMap 이란?
Multi-Thread 환경에서 동시성을 고려하여 최적화한 자료구조 입니다. 여러 스레드가 맵에 동시에 접근해도 성능이 저하되지 않도록 설계되었습니다. 비슷한 자료구조인 HahsTable, HashMap이 있습니다.
일정 범위 (버킷)단위로 여러 락을 사용하고 관리하여 동시에 여러 스레드가 접근할 수 있으면서도 충돌의 위험을 줄였습니다. 하지만 100% 완벽한 데이터 일관성을 제공하지는 않습니다.
- HashTable
- 전체에 synchronized키워드가 적용되어있습니다. 즉, 한번에 하나의 쓰레드가 작업을위해 접근할 수 있습니다.
- Thread-safe하다고 할 수 있지만 비효율적이고 병목현상이 발생될 수 있습니다.
- HashMap
- synchronized키워드가 적용되지 않아 RaceCondition이 발생할 수 있습니다.
- Multi-Thread환경에서는 사용하기 어렵다.
- ConcurrenthashMap
- 위 클래스들과 메서드기능면에서 큰 차이는 없지만 Thread-Safe하면서도 효율적인 자료구조이다.
- get() 메서드는 synchronized되어있지 않음 → RaceCondition 발생 가능
- put() 메서드의 경우 특정 범위에 대하여 synchronized되어 있음
참고로 synchronized 키워드는 Java에서 스레드 동기화를 위해 사용됩니다. 즉, 한 시점에 오직 하나의 스레드만이 특정 코드 블록이나 메서드에 접근할 수 있도록 제한하는 역할을 수행합니다.
2. Put()메서드
그렇다면 어떻게 Put()메서드가 동작하는지 내부 코드를 살펴보며 대략적으로 어떻게 실행되고, 어떻게 Thread-saft한가를 보도록 하겠습니다.
//putVal을 바로 호출함
public V put(K key, V value) {
return putVal(key, value, false);
}
public V putIfAbsent(K key, V value) {
return putVal(key, value, true);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
- 들어온key, value 값에대해 널체크를 하고 spread 함수에 key값의 hash코드로 hash변수값을 할당합니다.
- spread()함수를 통해 다양한 크기와 형태의 데이터를 고정된 크기의 정수 값으로 변환하는 과정을 다음과 같이 진행하게 됩니다.
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
//...
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS; // HASH_BITS = 0x7fffffff
}
- 이후 노드 배열을 반복문을돌며 각 노드를 감지하고 대응하는 과정을 거칩니다.
- 무한 루프처럼 보일 수 있지만 이후 등장할 조건문에 의해 적절히 종료됩니다.
for (Node<K,V>[] tab = table;;){//...}
- 각 상황에 맞는 작업을 진행하는데 먼저 테이블을 초기화하는 과정을 거칩니다.
if (tab == null || (n = tab.length) == 0) tab = initTable();
//...
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
- tabAt은 Unsafe 인스턴스를 사용하여 원자적 연산을 보장하는 메서드이다. tab객체의 특정 위치에서 기대하는 값(c),이 있는경우새로운값(v)으로 교체를 진행합니다. (블록킹 동작)
- Unsafe는 JDK내부적으로 사용되며 좀 더 low-level에 관여할 수 있는 객체인듯하다.. 일반적으로 사용되는것은 권장하지 않음 ( 추후 정리 )
- 아무튼 새로운 값을 추가하게됩니다.
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
//...
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
- HashMap처럼 ConcurrentHashMap또한 리사이징을 진행하게됩니다.
- 현재 노드의 hash값이 MOVED와 같으면 다른 스레드에서 리사이징이 진행중임을 의미합니다.
else if ((fh = f.hash) == MOVED) // MOVED = -1
tab = helpTransfer(tab, f);
//....
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
- 다음은 onlyIfAbsent 값과 여러 조건을 확인하고 모두 참인경우 현재 노드 값을 반환합니다.
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
- 위의 모든 상황에 해당하지 않을때 값의 수정이 아래와 같이 이루어집니다.
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
3. 마치며.
코드에 대한 상세한 이해가 부족합니다... 그럼에도 해당 자료구조가 어째서 Thread-safe하게 동작되는지에 대하여 알아 볼 수 있었습니다.
요약하자면이 구조는 반복문과 조건문(if 분기)을 사용하여 각 노드의 상태를 지속적으로 확인합니다. 이를 통해 각 노드가 현재 유효한 상태인지, 그리고 다른 스레드에 의해 어떠한 변경이 이루어지고 있는지를 지속적으로 감지합니다.
'Java' 카테고리의 다른 글
[JAVA] Builder 패턴을 알아보자 (0) | 2024.03.24 |
---|---|
[JAVA] 팩토리 메서드 패턴 (0) | 2024.03.21 |
[Java] CompletableFuture 사용하기 (0) | 2023.12.24 |
[Java] 디자인 패턴과 싱글톤 (0) | 2023.09.06 |
객체 지향 프로그래밍 (0) | 2023.08.30 |