Chapter 10 - 스파크 SQL
포스트
취소

Chapter 10 - 스파크 SQL

Chapter 10 - Spark SQL

Spark sql

  • Spark sql은 DB에서 생성된 View나 Table에 SQL 질의문을 실행할 수 있으며
  • 시스템 함수나 사용자 정의 함수를 사용할 수도 있다.
  • Ansi-sql과 HiveQL을 모두 지원하는 자체 개발된 SQL Parser가 포함되어 있다.

Spark sql과 Hive와의 관계

  • Spark SQL은 Hive Metastore를 사용하기 때문에 Hive와 용이하다.
  • Spark SQL에서는 조회활 파일 수를 최소화하기 위해 Hive Metastore에 접속한 후 Metadata를 참조한다.

Spark sql Thrift JDBC/ODBC 서버

  • Spark는 자바 데이터베이스 연결 인터페이스를 제공한다.
  • Thrift JDBC/ODBC 서버는 HiveServer2 기반으로 만들어졌다.
  • 사용자는 Thrift JDBC/ODBC 서버를 경유해 sql문을 실행할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Thrift 서버 실행
$SPARK_HOME/sbin/start-thriftserver.sh

# 환경변수를 통해 Thrift 서버의 주소를 변경할 수 있다.
# spark-submit에서 지원하는 모든 명령행 옵션을 사용할 수 있다.
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
$SPARK_HOME/sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

$SPARK_HOME/sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-poart> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri> \
  ...

Beeline으로 접속

1
2
3
beeline

beeline> !connect jdbc:hive2://localhost:10000

테이블 생성하기

  • Using구문을 사용하지 않을 시 Hive의 SerDe설정을 사용하게 되므로 Spark자체 직렬화보다 훨씬 느리다
    1
    2
    3
    4
    5
    6
    7
    
    CREATE TABLE flights (
    DEST_COUNTRY_NAME STRING,
    ORIGIN_COUNTRY_NAME STRING,
    count LONG
    )
    USING JSON
    OPTIONS (path './2015-summary.json')
    

복합 데이터 타입

표준 SQL과는 거리가 먼 기능 구조체, 리스트, 맵 타입이 존재한다.

구조체

구조체는 맵에 더 까가운 복합 데이터 타입이다. 여러 컬럼이나 표현식을 괄호로 묶으면 생성된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 구조체 생성
CREATE VIEW IF NOT EXISTS nested_data
AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count
FROM flights

# 구조체 조회
SELECT * FROM nested_data

# 구조체의 개별 칼럼 조회
SELECT country.DEST_COUNTRY_NAME, count FROM nested_data

# 구조체의 모든 개별 칼럼 조회
SELECT country.*, count FROM nested_data

리스트

collectlist, collectset 함수를 통해 생성할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#리스트 조희
SELECT DEST_COUNTRY_NAME AS new_name, collect_list(count)
AS flight_counts, collect_set(ORIGIN_COUNTRY_NAME) AS origin_set
FROM flights
GROUP BY DEST_COUNTRY_NAME

# 리스트의 특정 위치의 아이템 조회
SELECT DEST_COUNTRY_NAME AS new_name,
collect_list(count)[0] AS flight_counts, collect_set(ORIGIN_COUNTRY_NAME) AS origin_set
FROM flights
GROUP BY DEST_COUNTRY_NAME

# explod 함수를 이용한 배열을 로우로 만들기
CREATE OR REPLACE TEMP VIEW flights_agg AS
SELECT DEST_COUNTRY_NAME, collect_list(count) AS collected_counts
FROM flights
GROUP BY DEST_COUNTRY_NAME

# collect 함수와 정확히 반대로 동작한다.
# collect 수행 이전의 DataFrame 동일한 결과를 반환한다.
SELECT explode(collected_counts), DEST_COUNTRY_NAME
FROM flights_agg

스파크 sql 환경 설정

속성명기본값의미
spark.sql.inMemoryColumnarStorage.compressedTRUE이 값을 true로 설정하면 스파크 sql은 데이터 통계를 기반으로 각 컬럼에 대한 압축 코덱을 자동으로 선택
spark.sql.inMemoryColumnarStorage.batchSize10000컬럼 기반의 캐싱에 대한 배치 크기를 조절. 배치 크기가 클수록 메모리 사용량과 압축 성능이 향상되지만, 데이터 캐싱 시 OutOfMemoryError(OOM)가 발생할 위험이 있습니다.
spark.sql.files.maxPartitionBytes134217728 (128MB)파일을 읽을 때 단일 파티션에 할당할 수 있는 최대 바이트수를 정의합니다
spark.sql.files.openCostInBytes4194304 (4MB)동시에 스캔할 수 있는 바이트 수. 파일을 여는 데 드는 예상 비용을 측정하는 데 사용됩니다. 이 값은 다수의 파일을 파티션에 넣을 때 사용됩니다. 작은 파일을 많이 가진 파티션이 더 큰 파일 (먼저 작업이 예정된 파일)을 가진 파티션보다 더 좋은 성능을 낼 수 있도록 넉넉한 값을 설정하는 편이 좋습니다.
spark.spl.broadcastTimeout300브로드캐스트 조인 시 전체 워커노드로 전파할 때 기다릴 시간을 초(second)단위로 정의합니다.
spark.sql.autoBroadcastJoinThreshold10485760 (10MB)조인 시 전체 워커 노드로 전파될 테이블의 최대 크기를 바이트 단 위로 설정합니다. 이 값을 -1로 설정하면 브로드캐스트 조인을 비활성화합니다. 현재 ANALYZE TABLE COMPUTE STATISTICS noscan 명령이 실행된 하이브 메타스토어에 정의된 테이블만 통계를 사용할 수 있습니다.
spark.sql.shuffle.partitions200조인이나 집계 수행에 필요한 데이터를 셔플링할 때 사용할 파티션 수를 설정합니다.
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.