作为一个数据工程师,我始终想要开发数据驱动的应用程序,以推动数据科学和商业智能。我想构建具有价值并提供竞争优势的产品和服务,因为数据正成为智能决策的来源。
我了解到准确和实时的数据是关键驱动力。
我也见过一些组织缺失数据,并且对性能、行业等没有真正的理解。他们往往基于不准确的信息或者他们感觉正确的事情来做决定。
与此同时,那些利用数据的公司正在更好地理解他们的市场、业务和竞争对手的位置。这种类型的信息增强了信心。这是一项竞争优势。
数据管道是如何工作的?
那么,一个组织如何保持其数据最新并努力实现这一优势呢?
数据管道……是数据分析成功的基础。
一般来说,数据驱动型公司会雇佣数据工程师/架构师,在他们的基础设施中实施数据管道,即提取-转换-加载(ETL)工具。
但他们具体是如何做的呢?
构建数据管道的步骤有哪些?
数据管道将包括几个步骤,包括从源提取数据、数据预处理、验证和数据的目的地。让我们来看一个简单的例子。
在这个用例中,我从一家黑胶唱片市场抓取数据,并想对这些商品的价格进行分析。
构建数据管道的过程
这里的第一个步骤是通过我制作的Python脚本来调用API或数据库,并提取所需的数据。
太棒了,因为我现在有数据了!
问题是它不是我所需要的格式,所以我需要进行一些转换以获得所需的dataset。
完成这些后,我可以将其加载到一个临时区域,如AWS S3或Azure Blob进行数据存储。我称之为临时区域,因为我希望将其作为最终目的地的垫脚石。
为了确保其可靠性,我想要建立一些测试、警报和备份计划,以防出现错误或耗时过长的情况。最后,我的数据仓库将识别临时区域中的新记录,并摄取新数据,以保持组织内分析师和数据科学家的新鲜dataset。
砰!现在,我的报告和机器学习模型连接到了这个最终数据源,它以我所选择的速度提供数据!管道已经到位,部署完成,我不再需要手动接触它们或摄取数据(希望如此)。
这只是一个数据源,但现在我可以寻找其他来源,看看是否可以通过其他管道引入外部数据源,以增强我的数据并继续构建数据管道的竞争优势。查看下面的代码,以深入了解ETL代码。
import psycopg2
import csv
import boto3
import configparser
import os
import pandas as pd
from bs4 import BeautifulSoup
import requests
from time import time
from datetime import datetime
# config credentials from env
access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
bucket_name = 'discog-data'
# scrape data
startTime = time()
url = '...'
page = requests.get(url)
soup = BeautifulSoup(page.content, 'html.parser')
results = soup.find(id="pjax_container")
record_elements = results.find_all("tr", class_="shortcut_navigable")
item_list = []
price_list = []
sellers_list = []
total_price_list = []
link_list = []
for record_element in record_elements:
item_description = (record_element.find("a", class_="item_description_title"))
item_list.append(item_description.text)
price = (record_element.find("span", class_="price"))
price_list.append(price.text)
seller = record_element.find(lambda tag: tag.name == 'a' and tag.get('href') and tag.text and '/seller/' in tag.get('href'))
sellers_list.append(seller)
total_price = (record_element.find("span", class_="converted_price"))
total_price_list.append(total_price)
record_info = record_element.find(lambda tag: tag.name == 'a' and tag.get('href') and '/sell/' in tag.get('href'))
link = record_element.find("a", class_="item_description_title", href=True)
link_list.append(link['href'])
# create cols from item description
artists = [str(item).split('-')[0].rstrip() for item in item_list]
albums = [str(item).split('-')[1].lstrip() for item in item_list]
album_class = [str(item[item.find("(")+1:item.find(")")]) for item in item_list]
total_price_list = [item.text if item is not None else '0' for item in total_price_list]
my_dict = {'item_description': item_list,
'artists': artists,
'album': albums,
'album_class': album_class,
'seller': sellers_list,
'price': price_list,
'total price': total_price_list}
filename = f'discogs_market_data_{datetime.now().strftime("%Y%m%d-%H%M")}.csv'
df = pd.DataFrame(my_dict)
df.to_csv(filename, index=False)
# load file to S3
s3 = boto3.client('s3',
aws_access_key_id=access_key_id,
aws_secret_access_key=aws_secret_access_key)
s3_file = filename
s3.upload_file(filename, bucket_name, s3_file)
print(f' execution time: {(time() - startTime)}')