作为一名数据工程师,我一直渴望开发能够驱动数据科学和商业智能的数据驱动型应用程序。我希望构建能够增加价值并提供竞争优势的产品和服务,因为数据正日益成为明智决策的来源。
我了解到准确和实时的信息是关键驱动因素。
我还看到一些组织缺乏关键数据,并且无法真正理解其业绩、行业等情况。他们往往基于不准确的信息或凭感觉做出决策。
与此同时,那些充分利用其数据的公司正在更好地了解其市场、业务和竞争对手的地位。这类信息能建立信心。它是一种竞争优势。
数据管道是如何工作的?
那么,组织如何保持其数据的最新状态并朝着这种优势迈进呢?
数据管道……是分析成功的基石。
通常,数据驱动型公司会聘请数据工程师/架构师来在其基础设施中实施作为数据管道的提取-转换-加载 (ETL) 工具。
但他们具体是如何做到的呢?
构建数据管道的步骤有哪些?
数据管道通常包含几个步骤,包括从源头提取数据、数据预处理、验证以及将数据加载到目标位置。让我们来看一个简单的例子。
在此用例中,我将从黑胶唱片市场刮取数据,并希望对这些商品的定价进行分析。

构建数据管道的过程
这里的第一个步骤是使用我制作的 Python 脚本,通过 API 或数据库提取我们所需的数据。
这很棒,因为现在我拥有数据了!
问题是它并不完全是我想要的格式,因此我需要进行一些转换才能获得我期望的数据集。
完成此操作后,我就可以将其加载到像 AWS S3 或 Azure Blob 这样的暂存区域进行数据存储。我称之为暂存区域,是因为我想将其用作到达最终目的地的垫脚石。
为了确保其可靠性,我需要构建一些测试、警报和备份计划,以防出现问题或花费时间过长。最后,我的数据仓库将识别我暂存区域中的新记录,并摄取新数据,以便为组织中的分析师和数据科学家维护一个最新的数据集。
搞定!现在我的报表和机器学习模型已连接到这个最终数据源,并且该数据源正以我选择的速率被馈送数据!管道已就绪,已部署,而且我再也不需要触摸它们或手动摄取数据了(希望如此)。
这仅仅是一个数据源,但我现在可以找到其他数据源,并查看我是否可以通过其他管道引入外部数据源,从而使我的数据更强大,并继续建立数据管道的竞争优势。请查看下面的代码以更深入地了解 ETL 代码。
import psycopg2
import csv
import boto3
import configparser
import os
import pandas as pd
from bs4 import BeautifulSoup
import requests
from time import time
from datetime import datetime
# config credentials from env
access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
bucket_name = 'discog-data'
# scrape data
startTime = time()
url = '...'
page = requests.get(url)
soup = BeautifulSoup(page.content, 'html.parser')
results = soup.find(id="pjax_container")
record_elements = results.find_all("tr", class_="shortcut_navigable")
item_list = []
price_list = []
sellers_list = []
total_price_list = []
link_list = []
for record_element in record_elements:
item_description = (record_element.find("a", class_="item_description_title"))
item_list.append(item_description.text)
price = (record_element.find("span", class_="price"))
price_list.append(price.text)
seller = record_element.find(lambda tag: tag.name == 'a' and tag.get('href') and tag.text and '/seller/' in tag.get('href'))
sellers_list.append(seller)
total_price = (record_element.find("span", class_="converted_price"))
total_price_list.append(total_price)
record_info = record_element.find(lambda tag: tag.name == 'a' and tag.get('href') and '/sell/' in tag.get('href'))
link = record_element.find("a", class_="item_description_title", href=True)
link_list.append(link['href'])
# create cols from item description
artists = [str(item).split('-')[0].rstrip() for item in item_list]
albums = [str(item).split('-')[1].lstrip() for item in item_list]
album_class = [str(item[item.find("(")+1:item.find(")")]) for item in item_list]
total_price_list = [item.text if item is not None else '0' for item in total_price_list]
my_dict = {'item_description': item_list,
'artists': artists,
'album': albums,
'album_class': album_class,
'seller': sellers_list,
'price': price_list,
'total price': total_price_list}
filename = f'discogs_market_data_{datetime.now().strftime("%Y%m%d-%H%M")}.csv'
df = pd.DataFrame(my_dict)
df.to_csv(filename, index=False)
# load file to S3
s3 = boto3.client('s3',
aws_access_key_id=access_key_id,
aws_secret_access_key=aws_secret_access_key)
s3_file = filename
s3.upload_file(filename, bucket_name, s3_file)
print(f' execution time: {(time() - startTime)}')