Chapter 10 - Spark SQL
Spark sql
- Spark sql은 DB에서 생성된 View나 Table에 SQL 질의문을 실행할 수 있으며
- 시스템 함수나 사용자 정의 함수를 사용할 수도 있다.
- Ansi-sql과 HiveQL을 모두 지원하는 자체 개발된 SQL Parser가 포함되어 있다.
Spark sql과 Hive와의 관계
- Spark SQL은 Hive Metastore를 사용하기 때문에 Hive와 용이하다.
- Spark SQL에서는 조회활 파일 수를 최소화하기 위해 Hive Metastore에 접속한 후 Metadata를 참조한다.
Spark sql Thrift JDBC/ODBC 서버
- Spark는 자바 데이터베이스 연결 인터페이스를 제공한다.
- Thrift JDBC/ODBC 서버는 HiveServer2 기반으로 만들어졌다.
- 사용자는 Thrift JDBC/ODBC 서버를 경유해 sql문을 실행할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Thrift 서버 실행
$SPARK_HOME/sbin/start-thriftserver.sh
# 환경변수를 통해 Thrift 서버의 주소를 변경할 수 있다.
# spark-submit에서 지원하는 모든 명령행 옵션을 사용할 수 있다.
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
$SPARK_HOME/sbin/start-thriftserver.sh \
--master <master-uri> \
...
$SPARK_HOME/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-poart> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri> \
...
Beeline으로 접속
1
2
3
beeline
beeline> !connect jdbc:hive2://localhost:10000
테이블 생성하기
- Using구문을 사용하지 않을 시 Hive의 SerDe설정을 사용하게 되므로 Spark자체 직렬화보다 훨씬 느리다
1 2 3 4 5 6 7
CREATE TABLE flights ( DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count LONG ) USING JSON OPTIONS (path './2015-summary.json')
복합 데이터 타입
표준 SQL과는 거리가 먼 기능 구조체, 리스트, 맵 타입이 존재한다.
구조체
구조체는 맵에 더 까가운 복합 데이터 타입이다. 여러 컬럼이나 표현식을 괄호로 묶으면 생성된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 구조체 생성
CREATE VIEW IF NOT EXISTS nested_data
AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count
FROM flights
# 구조체 조회
SELECT * FROM nested_data
# 구조체의 개별 칼럼 조회
SELECT country.DEST_COUNTRY_NAME, count FROM nested_data
# 구조체의 모든 개별 칼럼 조회
SELECT country.*, count FROM nested_data
리스트
collectlist, collectset 함수를 통해 생성할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#리스트 조희
SELECT DEST_COUNTRY_NAME AS new_name, collect_list(count)
AS flight_counts, collect_set(ORIGIN_COUNTRY_NAME) AS origin_set
FROM flights
GROUP BY DEST_COUNTRY_NAME
# 리스트의 특정 위치의 아이템 조회
SELECT DEST_COUNTRY_NAME AS new_name,
collect_list(count)[0] AS flight_counts, collect_set(ORIGIN_COUNTRY_NAME) AS origin_set
FROM flights
GROUP BY DEST_COUNTRY_NAME
# explod 함수를 이용한 배열을 로우로 만들기
CREATE OR REPLACE TEMP VIEW flights_agg AS
SELECT DEST_COUNTRY_NAME, collect_list(count) AS collected_counts
FROM flights
GROUP BY DEST_COUNTRY_NAME
# collect 함수와 정확히 반대로 동작한다.
# collect 수행 이전의 DataFrame과 동일한 결과를 반환한다.
SELECT explode(collected_counts), DEST_COUNTRY_NAME
FROM flights_agg
스파크 sql 환경 설정
속성명 | 기본값 | 의미 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | TRUE | 이 값을 true로 설정하면 스파크 sql은 데이터 통계를 기반으로 각 컬럼에 대한 압축 코덱을 자동으로 선택 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 컬럼 기반의 캐싱에 대한 배치 크기를 조절. 배치 크기가 클수록 메모리 사용량과 압축 성능이 향상되지만, 데이터 캐싱 시 OutOfMemoryError(OOM)가 발생할 위험이 있습니다. |
spark.sql.files.maxPartitionBytes | 134217728 (128MB) | 파일을 읽을 때 단일 파티션에 할당할 수 있는 최대 바이트수를 정의합니다 |
spark.sql.files.openCostInBytes | 4194304 (4MB) | 동시에 스캔할 수 있는 바이트 수. 파일을 여는 데 드는 예상 비용을 측정하는 데 사용됩니다. 이 값은 다수의 파일을 파티션에 넣을 때 사용됩니다. 작은 파일을 많이 가진 파티션이 더 큰 파일 (먼저 작업이 예정된 파일)을 가진 파티션보다 더 좋은 성능을 낼 수 있도록 넉넉한 값을 설정하는 편이 좋습니다. |
spark.spl.broadcastTimeout | 300 | 브로드캐스트 조인 시 전체 워커노드로 전파할 때 기다릴 시간을 초(second)단위로 정의합니다. |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10MB) | 조인 시 전체 워커 노드로 전파될 테이블의 최대 크기를 바이트 단 위로 설정합니다. 이 값을 -1로 설정하면 브로드캐스트 조인을 비활성화합니다. 현재 ANALYZE TABLE COMPUTE STATISTICS noscan 명령이 실행된 하이브 메타스토어에 정의된 테이블만 통계를 사용할 수 있습니다. |
spark.sql.shuffle.partitions | 200 | 조인이나 집계 수행에 필요한 데이터를 셔플링할 때 사용할 파티션 수를 설정합니다. |