Chapter 9 - 데이터 소스
포스트
취소

Chapter 9 - 데이터 소스

Chapter 9 - 데이터 소스

JSON 파일 읽고 쓰기

1
2
3
4
5
spark.read.format('json').option('mode', 'FAILFAST')\
      .option('inferSchema', 'true')\
      .load('dir').show(5)

 csvFile.write.format('json').mode('overwrite').save('dir')

Parquet 형식 읽고 쓰기

parquet형식은 json이나 csv보다 읽기 연산시 훨씬 효율적으로 동작하므로 장기 저장용 데이터는 parquet형식으로 저장하는 것이 좋음. json이나 csv에 비해 옵션이 거의 없음. orc형식도 마찬가지고 둘 사이에 차이는 거의 없지만 spark에 최적화 된 것은 parquet. hive는 orc

1
2
3
4
5
 spark.read.format('parquet')\
      .load('dir').show()

 csvFile.write.format('parquet').mode('overwrite')\
        .save('dir')

Database 읽고 쓰기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
driver = 'org.sqlite.JDBC'
path = 'dir'
url = 'jdbc:sqlite:' + path
tableName = 'target'

# sql테이블을 읽어 dataframe화. sqlite
dbDataFrame = spark.read.format('jdbc').option('url', url)\
                        .option('dbTable', tableName).option('driver', driver).load()


# postgreSQL에서 테이블을 읽어 dataframe화
# 좀 더 많은 옵션 필요
pgDF = spark.read.format('jdbc')\
                 .option('driver', 'org.postgresql.driver')\
                 .option('url', 'jdbc:postgresql://database_server')\
                 .option('dbTable', 'schema.tableName')\
                 .option('user', 'username').option('password', 'my-secret-password').load()

query push down

spark에서 dataframe을 만들기 전에 데이터베이스 자체에서 데이터를 필터링하도록 하는 방식 연결된 db에서 데이터가 필터링 된 후 필터링 된 데이터로 spark dataframe을 만듦

1
2
3
4
5
6
pushDownQuery = """(select distinct(dest_country_name) from flight_info) as flight_info"""
dbDataFrame = spark.read.format('jdbc')\
                        .option('url', url)\
                        .option('dbTable', pushDownQuery)\
                        .option('driver', driver)\
                        .load()

데이터베이스 병렬로 읽기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# option에 numPartitions 추가

dbDataFrame = spark.read.format('jdbc')\
                   .option('url', url)\
                   .option('dbTable', tablename)\
                   .option('driver', driver)\
                   .option('numPartitions', 10).load()

# 필터를 db에 위임할 수 있지만(push down), spark 자체 partition에 결과 데이터를 저장함으로써 더 많은 처리 가능
# 데이터소스 생성 시 조건절 목록을 정의해서 spark 자체 partition에 결과 데이터 저장 가능
props = {'drivers':'org.sqlite.JDBC'}
predicates = [
    "dest_country_name = 'Sweden' or origin_country_name = 'Sweden'",
    "dest_country_name = 'Anguilla' or origin_country_name = 'Anguilla'"
]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).show()
spark.read.jdbc(url, tablename, predicates=predicates, properties=props)\
          .rdd.getNumPartitions() # 2

SQL쓰기

1
2
3
4
5
newPath = 'jdbc:sqlite://tmp/my-sqlite.db'
props = {'drivers':'org.sqlite.JDBC'}

csvFiles.write.jdbc(newPath, tablename, mode='overwrite', properties=props)
csvFiles.write.jdbc(newPath, tablename, mode='append', properties=props)

텍스트파일 읽고 쓰기

1
2
3
4
5
6
7
8
9
10
spark.read.textFile("/data/spark/data/flight-data/csv/2010-summary.csv")\
          .selectExpr("split(value, ',') as rows").show()

# 텍스트파일을 쓸(write) 때에는 문자열 컬럼이 하나만 존재해야 함.
csvFiles.select('dest_country_name').write.text('/tmp/simple-txt-file.txt')

# 텍스트 파일에 데이터를 저장할 때 파티셔닝 작업을 수행하면 더 많은 컬럼 저장 가능. 하지만 모든 파일에 컬럼을 추가하는 것이 아닌
# 텍스트 파일이 저장되는 디렉터리에 폴더별로 컬럼 저장
csvFiles.limit(10).select('dest_country_name', 'count')\
                  .write.partitionBy('count').text('/tmp/five-csv-files2py.csv')

파티셔닝

1
2
3
4
# 나라별로 파티셔닝된 데이터들 각 조건에 맞는 디렉터리에 저장됨
# 파티셔닝은 필터링을 자주 사용하는 테이블을 가진 경우에 사용 가능한 가장 손쉬운 최적화 방식
csvFiles.limit(10).write.mode('overwrite').partitionBy('dest_country_name')\
                        .save('/tmp/partitioned-files.parquet')

버케팅(Bucketing)

1
2
3
4
5
6
# 특정 컬럼을 파티셔닝하면 수억개의 디렉터리를 만들어 낼 수 있음.
# 이런 경우 데이터를 모아서 저장하는 방법이 필요한데 이를 버케팅이라 함
# 이후의 사용방식에 맞춰 사전에 파티셔닝되므로 조인이나 집계 시 발생하는 고비용의 셔플을 피할 수 있음

csvFiles.write.format('parquet').mode('overwrite')\
        .bucketBy(numberBuckets, columnToBucketBy).saveAsTable('bucketedFiles')
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.