Pandas
import pandas as pd
import numpy as np
Pandas 数据类型
s = pd.Series([3, -5, 7, 4], index=['a', 'b', 'c', 'd'])
data = {'Country': ['Belgium', 'India', 'Brazil'],
'Capital': ['Brussels', 'New Delhi', 'Brasília'],
'Population': [11190846, 1303171035, 207847528]}
df = pd.DataFrame(data,
columns=['Country', 'Capital', 'Population'])
#Pivvot,
data = {'Date': ['2016-03-01', '2016-03-02', '2016-03-01','2016-03-03','2016-03-02',
'2016-03-03'],
'Type': ['a', 'b', 'c','a','a','c'],
'Value': [11.432, 13.031, 20.784,99.906,1.303,20.784]}
df2 = pd.DataFrame(data,
columns=['Date', 'Type', 'Value'])
df3= df2.pivot(index='Date',
columns='Type',
values='Value')
print df2
print df3
Date Type Value
0 2016-03-01 a 11.432
1 2016-03-02 b 13.031
2 2016-03-01 c 20.784
3 2016-03-03 a 99.906
4 2016-03-02 a 1.303
5 2016-03-03 c 20.784
Type a b c
Date
2016-03-01 11.432 NaN 20.784
2016-03-02 1.303 13.031 NaN
2016-03-03 99.906 NaN 20.784
df4 = pd.pivot_table(df2,values='Value',index='Date',columns=['Type'])
print df2
print df4
Date Type Value
0 2016-03-01 a 11.432
1 2016-03-02 b 13.031
2 2016-03-01 c 20.784
3 2016-03-03 a 99.906
4 2016-03-02 a 1.303
5 2016-03-03 c 20.784
Type a b c
Date
2016-03-01 11.432 NaN 20.784
2016-03-02 1.303 13.031 NaN
2016-03-03 99.906 NaN 20.784
df4 = pd.pivot_table(df2,
values='Value',
index='Date',
columns=['Type'])
print df4
Type a b c
Date
2016-03-01 11.432 NaN 20.784
2016-03-02 1.303 13.031 NaN
2016-03-03 99.906 NaN 20.784
df5=pd.melt(df2,
id_vars=["Date"],
value_vars=["Type", "Value"],
value_name="Observations")
print df5
Date variable Observations
0 2016-03-01 Type a
1 2016-03-02 Type b
2 2016-03-01 Type c
3 2016-03-03 Type a
4 2016-03-02 Type a
5 2016-03-03 Type c
6 2016-03-01 Value 11.432
7 2016-03-02 Value 13.031
8 2016-03-01 Value 20.784
9 2016-03-03 Value 99.906
10 2016-03-02 Value 1.303
11 2016-03-03 Value 20.784
数据丢弃
s.drop(['a','c'])
df.drop('Country', axis=1)
Capital | Population | |
---|---|---|
0 | Brussels | 11190846 |
1 | New Delhi | 1303171035 |
2 | Brasília | 207847528 |
排序
df.sort_index()
df.sort_values(by='Country')
df.rank()
Country | Capital | Population | |
---|---|---|---|
0 | 1.0 | 2.0 | 1.0 |
1 | 3.0 | 3.0 | 3.0 |
2 | 2.0 | 1.0 | 2.0 |
获取数据信息
df.shape
df.index
df.columns
df.info()
df.count()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 3 columns):
Country 3 non-null object
Capital 3 non-null object
Population 3 non-null int64
dtypes: int64(1), object(2)
memory usage: 144.0+ bytes
Country 3
Capital 3
Population 3
dtype: int64
数据摘要
df.sum()
df.cumsum()
#df.min()/df.max()
#df.idxmin()/df.idxmax()
df.describe()
df.mean()
df.median()
Population 207847528.0
dtype: float64
选择
s['b']
df[1:]
df.iloc[[0],[0]]
df.iat([0])
#df.loc[[0], ['Country']]
#df.at([0], ['Country'])
df.ix[2]
df.ix[:,'Capital']
df.ix[1,'Capital']
#Boolean Indexing
s[~(s > 1)]
s[(s < -1) | (s > 2)]
df[df['Population']>1200000000]
s['a'] = 6
#Selecting
df3.loc[:,(df3>1).any()]
df3.loc[:,(df3>1).all()]
df3.loc[:,df3.isnull().any()]
df3.loc[:,df3.notnull().all()]
#Indexing With isin
df[(df.Country.isin(df2.Type))]
df3.filter(items=["a","b"])
df.select(lambda x: not x%5)
#Where
s.where(s > 0)
#Query
#df.query('second > first')
#Setting/Resetting Index
df.set_index('Country')
df4 = df.reset_index()
df = df.rename(index=str,columns={"Country":"cntry","Capital":"cptl",
"Population":"ppltn"})
s2 = s.reindex(['a','c','d','e','b'])
ss= df.reindex(range(4),method='ffill')
print ss
cntry cptl ppltn
0 NaN NaN NaN
1 NaN NaN NaN
2 NaN NaN NaN
3 NaN NaN NaN
s3 = s.reindex(range(5),method='bfill')
print s3
0 6
1 6
2 6
3 6
4 6
dtype: int64
数据聚合
#Aggregation
df2.groupby(by=['Date','Type']).mean()
df4.groupby(level=0).sum()
print df4
#df4.groupby(level=0).agg({'Capital':lambda x:sum(x)/len(x), 'Population': np.sum})
#Transformation
#customSum = lambda x: (x+x%2)
#df4.groupby(level=0).transform(customSum)
index Country Capital Population
0 0 Belgium Brussels 11190846
1 1 India New Delhi 1303171035
2 2 Brazil Brasília 207847528
帮助
help(pd.Series.loc)
使用函数
f=lambda x:x*2
df.apply(f)
df.applymap(f)
cntry | cptl | ppltn | |
---|---|---|---|
0 | BelgiumBelgium | BrusselsBrussels | 22381692 |
1 | IndiaIndia | New DelhiNew Delhi | 2606342070 |
2 | BrazilBrazil | BrasíliaBrasília | 415695056 |
数据对齐
s3 = pd.Series([7, -2, 3], index=['a', 'c', 'd'])
s + s3
a 13.0
b NaN
c 5.0
d 7.0
dtype: float64
输入输出
读取和写入csv
pd.read_csv('file.csv', header=None, nrows=5) df.to_csv('myDataFrame.csv')
读取和写入excel
pd.read_excel('file.xlsx') pd.to_excel('dir/myDataFrame.xlsx', sheet_name='Sheet1')
从多个表单读取数据
xlsx = pd.ExcelFile('file.xls') df = pd.read_excel(xlsx, 'Sheet1')
从SQL查询或者数据库表读取和写入数据
from sqlalchemy import create_engine engine = create_engine('sqlite:///:memory:') pd.read_sql("SELECT * FROM my_table;", engine) pd.read_sql_table('my_table', engine) pd.read_sql_query("SELECT * FROM my_table;", engine)
数据合并
dict1 = {'X1': ['a', 'b', 'c'],
'X2': ['11.432', '1.303', '99.906']}
dict2 = {'X1': ['a', 'b', 'd'],
'X3': ['20.784', 'NaN', '20.784']}
data1 = pd.DataFrame(dict1,
columns=['X1', 'X2'])
data2 = pd.DataFrame(dict2,
columns=['X1', 'X3'])
pd.merge(data1,
data2,
how='left',
on='X1')
X1 | X2 | X3 | |
---|---|---|---|
0 | a | 11.432 | 20.784 |
1 | b | 1.303 | NaN |
2 | c | 99.906 | NaN |
pd.merge(data1,
data2,
how='right',
on='X1')
X1 | X2 | X3 | |
---|---|---|---|
0 | a | 11.432 | 20.784 |
1 | b | 1.303 | NaN |
2 | d | NaN | 20.784 |
pd.merge(data1,
data2,
how='inner',
on='X1')
X1 | X2 | X3 | |
---|---|---|---|
0 | a | 11.432 | 20.784 |
1 | b | 1.303 | NaN |
pd.merge(data1,
data2,
how='outer',
on='X1')
X1 | X2 | X3 | |
---|---|---|---|
0 | a | 11.432 | 20.784 |
1 | b | 1.303 | NaN |
2 | c | 99.906 | NaN |
3 | d | NaN | 20.784 |
join
#help(df.join)
caller = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
caller.join(other, lsuffix='_caller', rsuffix='_other')
print caller
#data1.set_index('X1')
#data2.set_index('X1')
#data1.join(data2, lsuffix='data1', rsuffix='data2', how='right')
A key
0 A0 K0
1 A1 K1
2 A2 K2
3 A3 K3
4 A4 K4
5 A5 K5
caller = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
caller.set_index('key').join(other.set_index('key'))
print caller
A key
0 A0 K0
1 A1 K1
2 A2 K2
3 A3 K3
4 A4 K4
5 A5 K5
caller = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
'B': ['B0', 'B1', 'B2']})
caller.join(other.set_index('key'), on='key')
print caller
A key
0 A0 K0
1 A1 K1
2 A2 K2
3 A3 K3
4 A4 K4
5 A5 K5
pySpark
初始化spark
#SparkContext
from pyspark import SparkContext
sc = SparkContext(master = 'local[2]')
#Calculations With Variables
sc.version
sc.pythonVer
sc.master
str(sc.sparkHome)
str(sc.sparkUser())
sc.appName
sc.applicationId
sc.defaultParallelism
sc.defaultMinPartitions
#Configuration
from pyspark import SparkConf, SparkContext
conf = (SparkConf().setMaster("local").setAppName("My app").set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
加载数据
#Parallelized Collections
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]),("b",["p", "r"])])
#External Data
textFile = sc.textFile("/my/directory/*.txt")
textFile2 = sc.wholeTextFiles("/my/directory/")
选择数据
#Getting
rdd.collect() #[('a', 7), ('a', 2), ('b', 2)]
rdd.take(2) #[('a', 7), ('a', 2)]
rdd.first() #('a', 7)
rdd.top(2) #[('b', 2), ('a', 7)]
#Sampling
rdd3.sample(False, 0.15, 81).collect() #[3,4,27,31,40,41,42,43,60,76,79,80,86,97]
#Filtering
rdd.filter(lambda x: "a" in x).collect() #[('a',7),('a',2)]
rdd5.distinct().collect() #['a',2,'b',7]
rdd.keys().collect() #['a', 'a', 'b']
#Iterating
def g(x): print(x)
rdd.foreach(g)
('a', 7) ('b', 2) ('a', 2)
获取RDD信息:基本信息
rdd.getNumPartitions()
rdd.count()
rdd.countByKey()
rdd.countByValue()
rdd.collectAsMap()
rdd3.sum()
sc.parallelize([]).isEmpty()
获取RDD信息:概要信息
rdd3.max()
rdd3.min()
rdd3.mean()
rdd3.stdev()
rdd3.variance()
rdd3.histogram(3)
使用函数
rdd.map(lambda x:x+(x[1],x[0]))
rdd5=rdd.flatMap(lambda x:x+(x[1],x[0]))
rdd5.collect()
rdd4.flatMapValues(lambda x:x).collect()
数学操作
rdd.subtracrt(rdd2) #返回差集
rdd2.subtractByKey(rdd) #返回key的差集
rdd.cartesian(rdd2).collect()
排序
rdd2.sortBy(lambda x:x[1]).collect()
rdd2.sortByKey()
数据变形
rdd.repartition(4)
rddd.coalesce(1)
保存
rdd.saveAsTextFile("rdd.txt")
rdd.saveAsHadoopFile("hdfs://xxxxx.",'org.apache.hadoop.mapred.TextOutputFormat')
停止
sc.stop()
执行
$ ./bin/spark-submit examples/src/main/python/pi.py