0%

使用pyspark计算一点和周围距离最近的前100点位置

业务场景

有一批地理位置存储在postgres数据库表中为表1,有另一张基础表覆盖了地区所有的地理位置为表2,现在需要计算出表1中所有位置最近的前100个地点,并进行排序,最终在数据库中生成一张新表,表中包含目标地点的所有topN最近点的hash

使用工具

  • pyspark
  • postgres

实现过程

由于数据量过大所以需要将数据放在spark上运行,使用spark的dataframe数据格式进行数据处理比较便捷(spark2.0已经使用dataframe替代rdd)

  • 通过spark jdbc连接数据库,执行SQL生成dataframe格式数据,其中关键点在于如何通过SQL计算出topN距离(一开始我准备通过for循环,在代码中遍历求值,后来发现效率太低,而且需要通过spark udf将方法注册进spark),通过SQL进行计算范围两个步骤(1)计算出距离(2)对距离进行排序,排序时需要显性表示降序还是升序(3)对数据进行筛选


​ 以下是SQL,其中txpop是基础地理信息表,test_by_zhangrui是目标地理信息表,由于不能使用group by聚合函数, —– 所以换了一种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SELECT a.choose_site_hash,a.filtrate_site_hash,a.distance FROM
(SELECT
tmp2.hash1 as choose_site_hash,
txpop.grid_hash as filtrate__site_hash,
st_distance (tmp2.geo1 :: geography, txpop.geo :: geography) as distance,
ROW_NUMBER() OVER(PARTITION by tmp2.hash1 ORDER BY (st_distance (tmp2.geo1 :: geography, txpop.geo :: geography)) ASC) as n
FROM
(
SELECT
grid_hash AS hash1,
geo AS geo1
FROM
test_by_zhangrui
) tmp2,
txpop
) a
WHERE n<=10


1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_dataframe_jdbc(a):
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master("local[*]") \
.getOrCreate()
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://*****") \
.option("dbtable", a) \
.option("user", "*****") \
.option("password", "*****") \
.load()
return jdbcDF

  • 执行SQL后,spark返回一个spark dataframe格式数据,我们需要将dataframe并入数据库做持久化
1
2
3
4
5
6
7
8
9
def write_dataframe_jdbc(table_name, result):
result.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://******") \
.option("dbtable", table_name) \
.option("user", "*******") \
.option("password", "*******") \
.save()
print("表生成完毕")
  • 最后我们可以在数据库中得到我们想要的表(目前spark所有配置都是默认配置没有优化,包括SQL也没有优化,待日后进行优化)

Top K问题

1、在一组非常大量的数据中找出前100大的数据

(1)每次选出101个数进行排序,最后一个数删除,多次排序(垃圾算法,时间复杂度高)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.zhangrui.sort;

import java.util.Arrays;
import java.util.Random;

/**
* @author 张睿
* @date 2018/11/17 9:24
*/
public class Top100 {
public static void main(String[] args) {
long[] arr=new long[101];
Random r=new Random();
for(int i=0;i<100000;i++){
arr[0]=r.nextLong();
Arrays.sort(arr);
}
for(int i=1;i<arr.length;i++){
long l=arr[i];
System.out.println(l);
}
}
}

(2)大顶堆,优先队列 PriorityQueue,建立一个大小为K的队列(可以直接使用java中的优先队列 PriorityQueue,基于 优先级堆的极大优先级队列,队列中元素默认按照自然顺序排序,数字就是默认最小的放在队列头)如果队列中有空位可以直接添加元素,如有元素个数达到K,将要添加的元素与优先队列中的队首(也就是优先队列中最小的数)进行比较,选出最小的元素放在队列头

  • 时间复杂度为O(nlogk)

  • 队列中方法说明:

    **offer()**方法往队列添加元素如果队列已满直接返回false,队列未满则直接插入并返回true

    **add()**方法是对offer()方法的简单封装.如果队列已满,抛出异常new IllegalStateException(“Queue full”);

    **put()**方法往队列里插入元素,如果队列已经满,则会一直等待直到队列为空插入新元素,或者线程被中断抛出异常 **remove()**方法直接删除队头的元素:

    **peek()**方法直接取出队头的元素,并不删除.

    **element()**方法对peek方法进行简单封装,如果队头元素存在则取出并不删除,如果不存在抛出异常NoSuchElementException()

    **poll()**方法取出并删除队头的元素,当队列为空,返回null

    ​ **take()**方法取出并删除队头的元素,当队列为空,则会一直等待直到队列有新元素可以取出,或者线程被中断抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.zhangrui.sort;

import java.util.*;

/**
* @author 张睿
* @date 2018/11/17 11:30
* 使用固定容量的优先队列实现TopK问题
*/
public class Top100Test02 {
private PriorityQueue<Integer> queue;
private int k;//最大容量
public Top100Test02(int maxSize){
if (maxSize<0){
throw new IllegalArgumentException();
}
this.k=maxSize;
this.queue=new PriorityQueue<>(maxSize);
}

public void addElement(Integer e){
if (queue.size()<k){
queue.add(e);
}else {
Integer peek=queue.peek();//取得堆中最小元素
if(e<peek){
queue.poll();
queue.add(e);
}
}
}
public static void main(String[] args) {
final Top100Test02 TopQueue=new Top100Test02(10);//返回前十位
Random random=new Random();
int rNum=0;
System.out.println("100个0~999之间的随机数");
for (int i=1;i<=100;i++){
rNum=random.nextInt(1000);
TopQueue.addElement(rNum);
}
while (!TopQueue.queue.isEmpty()){
System.out.println(TopQueue.queue.poll());
}
}
}

(2)针对Top K最有效的算法是BFPRT算法(中位数的中位数算法)

  • 将n个元素每5个分为一组,分为n/5组,最后一组元素为n%5,有效组为n/5.

  • 取出每一组的中位数,最后一个组的不用计算中位数,任意排序方法

  • 将各组的中位数与数组开头的数据在组的顺序依次交换,这样每组的中位数就排在了每组数据的最左边

  • ‘递归调用中位数选择算法查找出所有中位数的中位数,设为X,偶数个中位数的情况下设定为选取中间小的一个

  • 按照X大小划分,大于或等于X的在X右边,小于X放在X左边

  • 得到X元素的下标i,i左边的元素都小于X,i右边的元素都大于或等于X

    若i==k,返回X

    若i<k,在小于X元素中递归查找第i小的元素

    若i>k,在大于等于X元素中递归查找第i-k小的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.zhangrui.sort;

import java.util.Random;

/**
* @author 张睿
* @date 2018/11/17 15:58
* BFPRT算法
*/
public class Top100Test03 {
private static int[] a=null;
//冒泡排序
public static void sort(int first,int end){
for(int flag=first;flag<end;flag++){
for(int i=end;i>flag;i--){
if(a[i]<a[i-1]){
int t=a[i];
a[i]=a[i-1];
a[i-1]=t;
}
}
}
}
//求最小的k个数
//数组a中从a[p]到a[r]的元素按照x划分
public static int partitionModify(int p,int r,int x){
int i,j;
for(i=p,j=r;i<j;i++){
if(a[i]>=x){
while (i<j&&a[j]>=x){
j--;
}
if(i!=j){
int t=a[i];
a[i]=a[j];
a[j]=t;
j--;
}else {
break;
}
}
}
if(a[i]>=x&&i>p){
return i-1;
}
return i;
}

//将r-p+1个元素按5个元素分为一组,分别找出各组的中位数
//再通过迭代求出中位数的中位数
public static int selectModify(int p,int r,int k){
int i;
if(r-p+1<=5){
sort(p,r);
return a[p+k-1];
}
//找出中位数
for(i=0;i<(r-p+1)/5;i++){
int s=p+5*i,t=s+4;//获得所划分的组中头元素和尾元素的下标
sort(s,t);//组内排序
int temp=a[p+i];//将中位数放在组内首元素位置
a[p+i]=a[s+2];
a[s+2]=temp;
}
int x=selectModify(p,p+(r-p+1)/5-1,(r-p+6)/10);
i=partitionModify(p,r,x);
int j=i-p+1;
if (k <= j) {
return selectModify(p, i, k);
}
else {
return selectModify(i + 1, r, k - j);
}
}

public static void main(String[] args) {
Random random=new Random();
System.out.println("100个0~999之间的随机数");
for (int i=0;i<100;i++){
a[i]=random.nextInt(1000);
}
System.out.println(selectModify(0,99,10));
}
}

初入SpringMVC遇到的坑

这几天在看SpringMVC,按照书上写入门案例,然而踩到了无数的坑。。。。。

1、 导包问题

刚开始我是规规矩矩的使用Maven导包,然而在运行程序时却一直报包丢失的错误

弄得很烦,在网上看了一下,发现Maven Web项目有时候需要自己手动将包导入项目

解决方法

在项目右键Properties 选择Deployment Assembly 点击 Add 选择 Java Build Path Entries

点击next 选择maven包 apply然后finish

2、maven问题

紧接着又遇到了maven包下载不完全的问题,有的时候网络情况不好的情况下,maven

下载包有时候会缺失

解决方法

我手动从网上下载了SpringMVC的包(主要是没有找到比较好的解决方法)

3、java 版本问题

这个问题让我头疼了好几天,就是因为在自己电脑上配置时没有出现问题,可是回到教室的电脑上

问题就出现了,显示为【org.springframework.web.servlet.PageNotFound】在你输入对应的URI时

总是显示【No mapping found for HTTP request with URI…….】,最后发现是java版本的问题好像在jdk1.6

就会出现此问题

解决方法

在eclipse中,右击项目properties,选择 Java Compiler将其版本改为1.7

最终结果

历经了一个半星期我终于将一个比较初级SSM框架搭建成功,实现了由URI返回id实现对数据库的操作

  • 一定要注意java开发环境的JDK要和配置的JDK保持一致,不然Tomcat会出现各种问题

数据库好难

今天看了一回儿数据库的丢失更新,顿时感觉数据库真的满抽象的,以前以为数据库就是增删改查,今天算是见识了,头都看大了


在高并发情景下经常发生数据库丢失更新的情况,事务之间什么时候该同步,什么时候不该同步,多个事务之间的权限管理真的很复杂。一般采用隔离级别来减少出现丢失更新的可能性,至于乐观锁和悲观锁,这个要等我看完了再说。
SQL标准定义了隔离级别有以下四层:

  • 脏读:最低的隔离级别,允许一个事务去读取另一个事务中未提交的数据。
  • 读/写操作:一个事务只能读取另一个事务已经提交的数据
  • 可重复读:针对数据库同一条记录而言,可重复读会使同一条数据库记录的读/写按照一个序列化进行操作,不会产生交叉情况,从而保证同一条数据的一致性
  • 序列化:让SQL按照顺序读/写的方式,能消除数据库事务之间并发产生数据不一致的问题。
***各类隔离级别和产生的现象***

脏读(产生影响):脏读,不可重读,幻读
读/写提交(产生影响):脏读,不可重读,幻读
可重复读(产生影响):幻读
序列化(产生影响):(没有)

MapReduce运行步骤

1、【input阶段】获取输入数据进行分片作为map的输入
2、【map阶段】过程对某种输入格式的一条记录解析一条或多条记录
3、【shffle阶段】对中间数据的控制,作为reduce的输入
4、【reduce阶段】对相同key的数据进行合并
5、【output阶段】按照格式输出到指定目录

MapReduce作业(job):

1
客户端需要执行的一个工作单元,它包括输入数据,MapReduce程序和配置信息

任务(task):

1
Hadoop将作业分成若干个小任务来执行,其中包括两类任务:map任务和reduce任务。这些任务由yarn进行调度并运行在集群的节点上

数据块(block):

1
HDFS上的文件被划分为大小相同的数据块,作为独立的存储单元,数据块的默认大小为128m

输入分片(input split):

1
2
3
4
Hadoop将MapReduce的数据划分为等长的小数据块,输入分片,或者简称“分片”。InputSplit分为两种情况:
-如果数据记录位于一个数据块中,InputSplit可表示完整的数据记录集,例如第一个InputSplit
-如果数据记录跨两个数据块,InputSplit中会包含第二个数据块的位置以及所需完整数据的偏移量。如第2-5个InputSplit。
默认情况下InputSprit与block大小一致

Map阶段的运行结果是保存在本地文件上的

hive组件

1
2
3
4
5
1、用户接口:包括Hive Shell 、Thrift客户端 、Web端口等
2、Thrift服务器:当Hive以服务器模式运行时,可以作为Thrift服务器供客户端连接
3、解析器:解析并执行HiveQL语句,由解释器、编译器、优化器和执行器组成
4、元数据库存储Hive元数据,通常使用MySQL或Derby
5、Hadoop数据仓库存储于HDFS上,并由MapReduce执行解析器生成的查询计划

HiveQL

1、HiveQL与大多数SQL语法兼容
2、HiveQL并不完全支持SQL标准,如:不支持更新操作和事务。由hbase底层决定

unix哲学

Write programs that do one thing and do it well.

做一件事,并且做好。

Write programs to work together.

相互合作,一起工作。

Write programs to handle text streams, because that is a universal interface.

处理文本流,因为它通用。

Rule of Modularity: Write simple parts connected by clean interfaces.

写简单的接口,简单的模块。

Rule of Clarity: Clarity is better than cleverness.

清晰比聪明好。

Rule of Composition: Design programs to be connected to other programs.

考虑复用。

Small is beautiful.

简单即优美。

Hbase组件

Client

1
1、包含访问HBase的接口并维护cache来加快对HBase的访问

Zookeeper

1
2
3
4
1、保证任何时候,集群中只有一个master
2、保存所有Region的寻址入口
3、实时监控Region Server的上线和下线信息。并实时通知给Master
4、存储HBase的schema和table元数据

Master

1
2
3
4
1、为Region Server分配region
2、负责Region Server的负载均衡
3、发现失效的RegionSever并重新分配其上的region
4、管理用户对table的增删改查操作

RegionServer

1
2
1、维护region,处理对这些region的I/O请求
2、负责切分在运行过程中变得过大的region

Hbase存储结构

Hbase和关系型数据库的区别

1
2
关系型数据库按行存储(.csv文件)
HBase的存储结构基于HDFS

Region

1
2
3
4
5
6
1、每个Region对应一个HRegion实例,这些实例被HRegionServer管理
一个表的所有Region会分布在不同的region服务器上被管理,但一个Region
内的数据只会被一个服务器所管理。
2、物理上数据存储在HDFS上,由Region服务器提供数据服务。
每个Region由一个或多个Store组成,每个Store保存一个列族的所有数据。
3、Store是Hbase存储的核心,每个Store由一个MemStore和零至多个StoreFile组成,StoreFile以HFile的格式存储在HDFS上。`

store

1
2
3
4
5
1、store按列族划分
2、StoreFile:按列划分
3、HFile:实际存入hdfs的文件,同一列的数据会被存在同一节点
4、HLog用于灾难备份,使用了预写式日志(WAL)
5、每个Region服务器对应一个Hlog,所以来自不同表的region日志是混在一起的,这样做的目的是写入日志时只需要追加单个文件即可。

介绍一下Hbase,为什么使用Hbase,Hbase的运行机制

QuickStart

Big Table:分布式数据存储系统

1
Hbase:Big Table 的Java实现(非关系型数据库)

主要特点:

    大:一个表可以包含上亿行,上百万列
    面向列:面向列的存储和权限控制,列的独立检索
    稀疏:对于为空的列,不占用存储空间
    针对每个CELL的值保存多个版本由时间戳来标识    
    每个表由行和列组成
    每行由‘行键’来唯一标识,默认存在
    每个列属于一个特定的列族(Column Family)
    表中由行和列确定的存储单元称为一个单元(Cell)
    每个单元保存了同一份数据的多个版本,由时间戳(TimeStamp)来标识
        

副本的控制:

1
副本的保存策略可以在建表时动态指定,默认只保存一个副本
(1) 按时间控制:可以设定保存的副本的时间长度,超过该时间的部分将被删除
(2)按最大副本数量来控制,比如说每个CULL就保存七个副本,当存入第八个副本时,第一个存入的副本将会被删除

Column(Hbase提供的一个列族的概念)

Family,当创建一个表格时,仅需要指定所有的列族,并不需要指定具体的列

Hbase的数据存储是以行为单位的,若干行的数据被划分一个region,该region内按列族划分为Store,一个Store内部按列(Qualifier限定符)划分为多个StoreFile,StoreFile物理上对应HFile文件,HBase存储时,是以HFile为单位进行分布存储的,因此同一列的数据会被优先保存在同一个节点上
这样的设计有利于数据分析。