การเชื่อมต่อด้วยภาษา Python
การเชื่อมต่อผ่าน ODBC
ติดตั้งและตั้งค่าโปรแกรม ODBC for Windows ให้เรียบร้อย
เขียนโปรแกรม Python เพื่อเชื่อมต่อกับ Data Platform
import pyodbc
# Create an ODBC connection to Data Platform via pre-configured DSN (Data Source Name)
cnxn = pyodbc.connect(DSN='TDV', execution_options={'stream_results': True})
# Get all tables
# You'll get a list of tuples with the following structure:
# (table_cat, table_schem, table_name, table_type, remarks)
tables = [table for table in cnxn.cursor().tables()]
# Get table names
table_names = [table.table_name for table in tables]
# Show first five tables
table_names[:5]
ตัวอย่างการใช้งาน ODBC
หลังจากเชื่อมต่อเรียบร้อยแล้วคุณสามารถใช้ pandas
หรือ Data Processing Library อื่นๆในการประมวลผลข้อมูลได้ทันที มีตัวอย่างที่ใช้งานบ่อยดังนี้
แสดงรายการ Tables
import pyodbc
# Connect using DSN (Data Source Name)
cnxn = pyodbc.connect(DSN='TDV', execution_options={"stream_results": True})
# Get all tables
# You'll get a list of tuples with the following structure:
# (table_cat, table_schem, table_name, table_type, remarks)
tables = [table for table in cnxn.cursor().tables()]
# Get table names
table_names = [table.table_name for table in tables]
# Show first five tables
table_names[:5]
ค้นหาข้อมูลจากชื่อตาราง
# Get all tables with name containing 'happy'
[name for name in table_names if 'happy' in name]
ค้นหาข้อมูลจากกลุ่มข้อมูล
[table.table_name for table in tables if 'External' in table.table_cat]
ค้นหาข้อมูลจากหมวดหมู่
[table.table_name for table in tables if 'ข้อมูลผลตรวจสุขภาพ' in table.table_schem]
Query ข้อมูล Sample จากตาราง
ใช้คำสั่ง top
ในการระบุจำนวนแถวที่ต้องการให้ Data Platform ส่งข้อมูล
import pandas as pd
df = pd.read_sql('SELECT TOP 10 * FROM happy', cnxn)
df
Query ข้อมูลทั้งหมดจากตาราง
import pandas as pd
df = pd.read_sql('SELECT * FROM happy', cnxn)
df
การประมวลผลบน Data Platform
ใช้ในกรณีที่ต้องการใช้ทรัพยากรเครื่อง Server ของ Data Platform ในการกรองและประมวลผลข้อมูล
# Create SQL Statement
sql_stmt = """
SELECT department, AVG(score) as average
FROM happy
GROUP BY department
ORDER BY average ASC
"""
# Submit the SQL query
happiness_rank = pd.read_sql(sql_stmt, cnxn)
happiness_rank[:5]
การอ่านข้อมูลที่ขนาดมากกว่า RAM
ในกรณีที่ข้อมูลที่ต้องใช้เยอะกว่าหน่วยความจำเครื่อง (RAM) หรือต้องการประมวลผลเป็นส่วนเพื่อลดการใช้งานทรัพยากรเครื่อง หนึ่งในวิธีแก้ปัญหาคือ การอ่านเป็นส่วนๆ (chunk) ซึ่งคุณสามารถกำหนดปริมาณของข้อมูลด้วย chunksize
หรือใช้โมดูลที่ทำการประมวลข้อมูลขนาดใหญ่โดยตรง เช่น Spark
# amr_load_profile_201902 contains about 18 million rows.
# pandas is not very good at multicore processing so this will take about 2 minutes on a single core processing.
# This latency includes: server processing time + network time + data processing time
# You should consider multi-core data processing libraries such as modin, dask or spark etc.
results = []
chunks = pd.read_sql('SELECT date_stamp, export_demand_kw FROM amr_load_profile_201902', cnxn, chunksize=1_000_000)
for chunk in chunks:
processed = chunk.groupby('date_stamp')['export_demand_kw'].agg(['sum', 'count'])
results.append(processed)
combined_chunks = pd.concat(results).groupby('date_stamp').sum()
combined_chunks['mean'] = combined_chunks['sum']/combined_chunks['count']
combined_chunks[['mean']]
การประมวลผลข้อมูลขนาดใหญ่ผ่าน ODBC
ไม่แนะนำสำหรับข้อมูลขนาดใหญ่มาก เนื่องจากมี Tools ที่เหมาะสมมากกว่า เช่น Apache Spark
%%time
# This will take about 1 minute.
# This latency includes: server processing time + network transfer time + ODBC driver performance + local processing time.
sql_stmt = '''
SELECT date_stamp, AVG(export_demand_kw) AS average_demand
FROM amr_load_profile_201902
GROUP BY date_stamp
ORDER BY date_stamp ASC
'''
average_kw = pd.read_sql(sql_stmt, cnxn)
average_kw
การเชื่อมต่อผ่าน REST API
ระบบ Data Platform อยู่ระหว่างจัดทำ REST API สำหรับชุดข้อมูลที่มีในระบบ
API Catalog
ระบบ Data Platform อยู่ระหว่างการพัฒนา API Catalog และ Documentation เพื่อเพิ่มความสะดวกในการใช้งาน API โดยปัจจุบันคุณสามารถตรวจสอบชุดข้อมูลที่ให้บริการ API ได้ที่ Data Catalog
Endpoint URL มีโครงสร้างดังนี้
http://data.mea.or.th:9400/api/v1/{GROUP}/{SERVICE}
หัวข้อ ค่าที่กรอก รายละเอียด GROUP ชื่อกลุ่มข้อมูล ตรวจสอบได้ที่ Data Catalog เช่น customer
SERVICE ชื่อ Service การให้บริการข้อมูล เช่น getByMeterDistrict
(ข้อมูล Customers ตามเขต)สร้างไฟล์ชื่อว่า
.env
ใหม่โดยกรอกรายละเอียดusername
และpassword
เพื่อใช้เชื่อมต่อกับ REST API โดยระบบ Data Platform ใช้username
และpassword
จากระบบ AD ของ กฟน. (ชุดเดียวกับที่ใช้งาน WiFi)user=your_username
pass=your_passwordUsername & Password
คุณไม่ควรเก็บข้อมูล Username และ Password ลงบน Python Source File คุณสามารถใช้ Environment Variables หรือ Configuration Files ช่วยในการเก็บข้อมูลอ่อนไหว หากคุณใช้ Version Control คุณควรจะ Exclude ไฟล์ที่มี Username และ Password ก่อนการ Commit (เช่น .gitignore ของ git)
เขียนโปรแกรม Python เพื่อดึงข้อมูลผ่าน REST API
import requests
import os
import pandas as pd
from dotenv import load_dotenv
# Endpoint URL
URL = f'http://data.mea.or.th:9400/api/v1/customers/getByMeterDistrict?district={district}&page={page}&pageSize={pageSize}'
# Load username and password from dotfiles
load_dotenv()
username = os.environ.get('user')
password = os.environ.get('pass')
# Create Authentication object to perform HTTP Authentication
auth = requests.auth.HTTPBasicAuth(username, password)
# Call the endpoint
response = requests.get(URL, auth=auth)
# Parse results and build pandas DataFrame
if response.status_code == 200:
rbd_customers = pd.DataFrame(response.json().get('getByMeterDistrictResult'))
ตัวอย่างการดึงข้อมูล Proxy Logs
import requests
import os
import pandas as pd
from dotenv import load_dotenv
# Input parameters & URL
URL = f'http://data.mea.or.th:9400/api/v1/logs/getProxyLogs'
# Load username and password from dotfiles
load_dotenv()
username = os.environ.get('user')
password = os.environ.get('pass')
# Create Authentication object to perform HTTP Authentication
auth = requests.auth.HTTPBasicAuth(username, password)
# Call the endpoint
response = requests.get(URL, auth=auth)
# Parse results and build pandas DataFrame
if response.status_code == 200:
proxy = pd.DataFrame(response.json().get('getProxyLogsResult'))